Modèle SQL Server vers BigQuery

Le modèle SQL Server vers BigQuery est un pipeline de traitement par lot qui copie les données d'une table SQL Server vers une table BigQuery existante. Ce pipeline utilise JDBC pour se connecter à SQL Server. Pour obtenir une couche supplémentaire de protection, vous pouvez également transmettre une clé Cloud KMS avec des paramètres de nom d'utilisateur, de mot de passe et de chaîne de connexion encodés en base64 et chiffrés avec la clé Cloud KMS. Pour en savoir plus sur le chiffrement des paramètres de nom d'utilisateur, de mot de passe et de chaîne de connexion, consultez la page Point de terminaison de chiffrement de l'API Cloud KMS.

Conditions requises pour ce pipeline

  • La table BigQuery doit exister avant l'exécution du pipeline.
  • La table BigQuery doit avoir un schéma compatible.
  • La base de données relationnelle doit être accessible à partir du sous-réseau dans lequel Dataflow est exécuté.

Paramètres de modèle

Paramètres obligatoires

  • driverJars: liste des fichiers JAR du pilote, séparés par une virgule. Exemple :gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
  • driverClassName: nom de la classe du pilote JDBC. Exemple :com.mysql.jdbc.Driver
  • connectionURL: chaîne d'URL de connexion JDBC. Exemple :jdbc:mysql://some-host:3306/sampledb Vous pouvez transmettre cette valeur sous forme de chaîne chiffrée avec une clé Cloud KMS, puis encodée en base64. Supprimez les espaces blancs de la chaîne encodée en base64. Notez la différence entre une chaîne de connexion à une base de données Oracle non-RAC (jdbc:oracle:thin:@some-host:<port>:<sid>) et une chaîne de connexion à une base de données Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Par exemple, jdbc:mysql://some-host:3306/sampledb.
  • outputTable: emplacement de la table de sortie BigQuery. Exemple :<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • bigQueryLoadingTemporaryDirectory: répertoire temporaire pour le processus de chargement de BigQuery. Exemple :gs://your-bucket/your-files/temp_dir

Paramètres facultatifs

  • connectionProperties: chaîne de propriétés à utiliser pour la connexion JDBC. Le format de la chaîne doit être [propertyName=property;]*. Pour en savoir plus, consultez la section "Propriétés de configuration" (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) dans la documentation MySQL. Par exemple, unicode=true;characterEncoding=UTF-8.
  • username: nom d'utilisateur à utiliser pour la connexion JDBC. Peut être transmis en tant que chaîne chiffrée avec une clé Cloud KMS ou en tant que secret Secret Manager au format projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: mot de passe à utiliser pour la connexion JDBC. Peut être transmis en tant que chaîne chiffrée avec une clé Cloud KMS ou en tant que secret Secret Manager au format projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query: requête à exécuter sur la source pour extraire les données. Veuillez noter que certains types JDBC SQL et BigQuery, même s'ils partagent le même nom, présentent quelques différences. Voici quelques mappages de types SQL -> BigQuery importants à retenir : DATETIME --> TIMESTAMP. Une conversion de type peut être nécessaire si vos schémas ne correspondent pas. Par exemple, select * from sampledb.sample_table.
  • KMSEncryptionKey: clé de chiffrement Cloud KMS à utiliser pour déchiffrer le nom d'utilisateur, le mot de passe et la chaîne de connexion. Si vous transmettez une clé Cloud KMS, vous devez également chiffrer le nom d'utilisateur, le mot de passe et la chaîne de connexion. Exemple :projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • useColumnAlias: si la valeur est true, le pipeline utilise l'alias de colonne (AS) au lieu du nom de colonne pour mapper les lignes sur BigQuery. La valeur par défaut est false.
  • isTruncate: si la valeur est true, le pipeline est tronqué avant de charger les données dans BigQuery. La valeur par défaut est false, ce qui amène le pipeline à ajouter des données.
  • partitionColumn: si ce paramètre est fourni avec le nom de table défini en tant que paramètre facultatif, JdbcIO lit la table en parallèle en exécutant plusieurs instances de la requête sur la même table (sous-requête) à l'aide de plages. Actuellement, ne prend en charge que les colonnes de partition Long.
  • table: table à lire lors de l'utilisation de partitions. Ce paramètre accepte également une sous-requête entre parenthèses. Exemple :(select id, name from Person) as subq
  • numPartitions: nombre de partitions. Avec les limites inférieure et supérieure, cette valeur forme des pas de partition pour les expressions de clause WHERE générées qui sont utilisées pour diviser la colonne de partition de manière uniforme. Lorsque l'entrée est inférieure à 1, le nombre est défini sur 1.
  • lowerBound: limite inférieure à utiliser dans le schéma de partition. Si cette valeur n'est pas fournie, elle est automatiquement déduite par Apache Beam pour les types compatibles.
  • upperBound: limite supérieure à utiliser dans le schéma de partition. Si cette valeur n'est pas fournie, elle est automatiquement déduite par Apache Beam pour les types compatibles.
  • fetchSize: nombre de lignes à extraire simultanément de la base de données. Non utilisé pour les lectures partitionnées. La valeur par défaut est 50 000.
  • createDisposition: la propriété CreateDisposition de BigQuery à utiliser. Par exemple, CREATE_IF_NEEDED ou CREATE_NEVER. La valeur par défaut est CREATE_NEVER.
  • bigQuerySchemaPath: chemin d'accès Cloud Storage pour le schéma JSON BigQuery. Si createDisposition est défini sur CREATE_IF_NEEDED, ce paramètre doit être spécifié. Par exemple, gs://your-bucket/your-schema.json.
  • outputDeadletterTable: table BigQuery à utiliser pour les messages qui n'ont pas pu atteindre la table de sortie, au format "PROJECT_ID:DATASET_NAME.TABLE_NAME". Si la table n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, le pipeline échouera en cas d'erreur d'écriture.Ce paramètre ne peut être spécifié que si useStorageWriteApi ou useStorageWriteApiAtLeastOnce est défini sur "true".
  • disabledAlgorithms: algorithmes à désactiver, séparés par une virgule. Si cette valeur est définie sur none, aucun algorithme n'est désactivé. Utilisez ce paramètre avec prudence, car les algorithmes désactivés par défaut peuvent présenter des failles ou des problèmes de performances. Par exemple, SSLv3, RC4.
  • extraFilesToStage: chemins d'accès Cloud Storage ou secrets Secret Manager séparés par une virgule afin que les fichiers soient traités dans le nœud de calcul. Ces fichiers sont enregistrés dans le répertoire "/extra_files" de chaque nœud de calcul. Exemple :gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>
  • useStorageWriteApi: si la valeur est true, le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the SQL Server to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/SQLServer_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JDBC_CONNECTION_URL : URL de connexion JDBC
  • SOURCE_SQL_QUERY : requête SQL à exécuter sur la base de données source.
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • CONNECTION_PROPERTIES : propriétés de connexion JDBC, le cas échéant.
  • CONNECTION_USERNAME : nom d'utilisateur de la connexion JDBC
  • CONNECTION_PASSWORD : mot de passe de la connexion JDBC
  • KMS_ENCRYPTION_KEY : clé de chiffrement Cloud KMS.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/SQLServer_to_BigQuery",
    "parameters": {
      "connectionURL": "JDBC_CONNECTION_URL",
      "query": "SOURCE_SQL_QUERY",
      "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
      "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • JDBC_CONNECTION_URL : URL de connexion JDBC
  • SOURCE_SQL_QUERY : requête SQL à exécuter sur la base de données source.
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • CONNECTION_PROPERTIES : propriétés de connexion JDBC, le cas échéant.
  • CONNECTION_USERNAME : nom d'utilisateur de la connexion JDBC
  • CONNECTION_PASSWORD : mot de passe de la connexion JDBC
  • KMS_ENCRYPTION_KEY : clé de chiffrement Cloud KMS.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "If BigQuery table already exist before pipeline execution, it must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
      final String partitionColumnType = options.getPartitionColumnType();
      if (partitionColumnType == null || "long".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
            JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          // Check if lower bound and upper bound are long type.
          try {
            longTypeReadIO =
                longTypeReadIO
                    .withLowerBound(Long.valueOf(options.getLowerBound()))
                    .withUpperBound(Long.valueOf(options.getUpperBound()));
          } catch (NumberFormatException e) {
            throw new NumberFormatException(
                "Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
          }
        }
        readIO = longTypeReadIO;
      } else if ("datetime".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
            JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          DateTimeFormatter dateFormatter =
              DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
          // Check if lowerBound and upperBound are DateTime type.
          try {
            dateTimeReadIO =
                dateTimeReadIO
                    .withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
                    .withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
          } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException(
                "Expected DateTime values in the format for lowerBound and upperBound, received : "
                    + e.getMessage());
          }
        }
        readIO = dateTimeReadIO;
      } else {
        throw new IllegalStateException("Received unsupported partitionColumnType.");
      }
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      BigQueryInsertError insertError = Objects.requireNonNull(c.element());
      List<String> errorMessages =
          insertError.getError().getErrors().stream()
              .map(ErrorProto::getMessage)
              .collect(Collectors.toList());
      String stackTrace = String.join("\nCaused by:", errorMessages);

      throw new IllegalStateException(
          String.format(
              "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));
    }
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .withExtendedErrorInfo()
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}

Étape suivante