Template PostgreSQL ke BigQuery

Template PostgreSQL ke BigQuery adalah pipeline batch yang menyalin data dari tabel PostgreSQL ke tabel BigQuery yang ada. Pipeline ini menggunakan JDBC untuk terhubung ke PostgreSQL. Untuk lapisan perlindungan tambahan, Anda juga dapat meneruskan kunci Cloud KMS beserta parameter string koneksi, nama pengguna, dan sandi yang dienkode Base64 yang dienkripsi dengan kunci Cloud KMS. Untuk mengetahui informasi selengkapnya tentang mengenkripsi parameter nama pengguna, sandi, dan string koneksi, lihat endpoint enkripsi Cloud KMS API.

Persyaratan pipeline

  • Tabel BigQuery harus sudah ada sebelum eksekusi pipeline.
  • Tabel BigQuery harus memiliki skema yang kompatibel.
  • Database relasional harus dapat diakses dari subnet tempat Dataflow berjalan.

Parameter template

Parameter yang diperlukan

  • driverJars: Daftar file JAR driver yang dipisahkan koma. Contoh, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • driverClassName: Nama class driver JDBC. Contoh, com.mysql.jdbc.Driver.
  • connectionURL: String URL koneksi JDBC. Contoh, jdbc:mysql://some-host:3306/sampledb. Anda dapat meneruskan nilai ini sebagai string yang dienkripsi dengan kunci Cloud KMS, lalu dienkode dengan Base64. Menghapus karakter spasi kosong dari string yang dienkode Base64. Perhatikan perbedaan antara string koneksi database non-RAC Oracle (jdbc:oracle:thin:@some-host:<port>:<sid>) dan string koneksi database Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Misalnya, jdbc:mysql://some-host:3306/sampledb.
  • outputTable: Lokasi tabel output BigQuery. Contoh, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: Direktori sementara untuk proses pemuatan BigQuery. Contoh, gs://your-bucket/your-files/temp_dir.

Parameter opsional

  • connectionProperties: String properti yang akan digunakan untuk koneksi JDBC. Format string harus [propertyName=property;]*.Untuk informasi selengkapnya, lihat Properti Konfigurasi (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) dalam dokumentasi MySQL. Contohnya, unicode=true;characterEncoding=UTF-8
  • username: Nama pengguna yang akan digunakan untuk koneksi JDBC. Dapat diteruskan sebagai string yang dienkripsi dengan kunci Cloud KMS, atau dapat berupa secret Secret Manager dalam bentuk projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: Sandi yang akan digunakan untuk koneksi JDBC. Dapat diteruskan sebagai string yang dienkripsi dengan kunci Cloud KMS, atau dapat berupa secret Secret Manager dalam bentuk projects/{project}/secrets/{secret}/versions/{secret_version}.
  • kueri: Kueri yang akan dijalankan di sumber untuk mengekstrak data. Perhatikan bahwa beberapa jenis JDBC SQL dan BigQuery, meskipun memiliki nama yang sama, memiliki beberapa perbedaan. Beberapa pemetaan jenis SQL -> BigQuery yang penting untuk diingat adalah DATETIME --> TIMESTAMP. Pemrosesan jenis mungkin diperlukan jika skema Anda tidak cocok. Misalnya, select * from sampledb.sample_table.
  • KMSEncryptionKey: Kunci enkripsi Cloud KMS yang akan digunakan untuk mendekripsi nama pengguna, sandi, dan string koneksi. Jika meneruskan kunci Cloud KMS, Anda juga harus mengenkripsi nama pengguna, sandi, dan string koneksi. Contoh, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • useColumnAlias: Jika ditetapkan ke true, pipeline akan menggunakan alias kolom (AS), bukan nama kolom, untuk memetakan baris ke BigQuery. Setelan defaultnya adalah false.
  • isTruncate: Jika ditetapkan ke true, pipeline akan terpotong sebelum memuat data ke BigQuery. Defaultnya adalah false, yang menyebabkan pipeline menambahkan data.
  • partitionColumn: Jika parameter ini diberikan dengan nama table yang ditentukan sebagai parameter opsional, JdbcIO akan membaca tabel secara paralel dengan menjalankan beberapa instance kueri pada tabel yang sama (subkueri) menggunakan rentang. Saat ini, hanya mendukung kolom partisi Long.
  • table: Tabel yang akan dibaca saat menggunakan partisi. Parameter ini juga menerima subkueri dalam tanda kurung. Contoh, (select id, name from Person) as subq.
  • numPartitions: Jumlah partisi. Dengan batas bawah dan atas, nilai ini membentuk langkah partisi untuk ekspresi klausa WHERE yang dihasilkan yang digunakan untuk membagi kolom partisi secara merata. Jika input kurang dari 1, angka akan ditetapkan ke 1.
  • lowerBound: Batas bawah yang akan digunakan dalam skema partisi. Jika tidak diberikan, nilai ini akan otomatis disimpulkan oleh Apache Beam untuk jenis yang didukung.
  • upperBound: Batas atas yang akan digunakan dalam skema partisi. Jika tidak diberikan, nilai ini akan otomatis disimpulkan oleh Apache Beam untuk jenis yang didukung.
  • fetchSize: Jumlah baris yang akan diambil dari database dalam satu waktu. Tidak digunakan untuk pembacaan yang dipartisi. Setelan defaultnya adalah: 50.000.
  • createDisposition: CreateDisposition BigQuery yang akan digunakan. Misalnya CREATE_IF_NEEDED atau CREATE_NEVER. Defaultnya adalah: CREATE_NEVER.
  • bigQuerySchemaPath: Jalur Cloud Storage untuk skema JSON BigQuery. Jika createDisposition ditetapkan ke CREATE_IF_NEEDED, parameter ini harus ditentukan. Contohnya, gs://your-bucket/your-schema.json
  • outputDeadletterTable: Tabel BigQuery yang akan digunakan untuk pesan yang gagal mencapai tabel output, yang diformat sebagai "PROJECT_ID:DATASET_NAME.TABLE_NAME". Jika tidak ada, tabel akan dibuat saat pipeline berjalan. Jika parameter ini tidak ditentukan, pipeline akan gagal saat terjadi error tulis.Parameter ini hanya dapat ditentukan jika useStorageWriteApi atau useStorageWriteApiAtLeastOnce ditetapkan ke benar (true).
  • disabledAlgorithms: Algoritma yang dipisahkan koma untuk dinonaktifkan. Jika nilai ini disetel ke none, tidak ada algoritma yang dinonaktifkan. Gunakan parameter ini dengan hati-hati, karena algoritma yang dinonaktifkan secara default mungkin memiliki kerentanan atau masalah performa. Misalnya, SSLv3, RC4.
  • extraFilesToStage: Jalur Cloud Storage yang dipisahkan koma atau secret Secret Manager untuk file yang akan di-stage di pekerja. File ini disimpan di direktori /extra_files di setiap pekerja. Contoh, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.
  • useStorageWriteApi: Jika true, pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Saat menggunakan Storage Write API, menentukan semantik tulis. Untuk menggunakan semantik minimal satu kali (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the PostgreSQL to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PostgreSQL_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • SOURCE_SQL_QUERY: kueri SQL yang akan dijalankan di database sumber
  • DATASET: set data BigQuery Anda
  • TABLE_NAME: nama tabel BigQuery Anda
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage Anda ke direktori sementara
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • KMS_ENCRYPTION_KEY: kunci enkripsi Cloud KMS

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PostgreSQL_to_BigQuery"
    "parameters": {
      "connectionURL": "JDBC_CONNECTION_URL",
      "query": "SOURCE_SQL_QUERY",
      "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
      "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • SOURCE_SQL_QUERY: kueri SQL yang akan dijalankan di database sumber
  • DATASET: set data BigQuery Anda
  • TABLE_NAME: nama tabel BigQuery Anda
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage Anda ke direktori sementara
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • KMS_ENCRYPTION_KEY: kunci enkripsi Cloud KMS
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "If BigQuery table already exist before pipeline execution, it must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      // TODO(pranavbhandari): Support readWithPartitions for other data types.
      JdbcIO.ReadWithPartitions<TableRow, Long> readIO =
          JdbcIO.<TableRow>readWithPartitions()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withTable(options.getTable())
              .withPartitionColumn(options.getPartitionColumn())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getLowerBound() != null && options.getUpperBound() != null) {
        readIO =
            readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound());
      }

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      BigQueryInsertError insertError = Objects.requireNonNull(c.element());
      List<String> errorMessages =
          insertError.getError().getErrors().stream()
              .map(ErrorProto::getMessage)
              .collect(Collectors.toList());
      String stackTrace = String.join("\nCaused by:", errorMessages);

      throw new IllegalStateException(
          String.format(
              "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));
    }
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .withExtendedErrorInfo()
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}

Langkah berikutnya