Modèle Pub/Sub vers BigQuery

Le modèle Pub/Sub vers BigQuery est un pipeline par flux qui lit les messages au format JSON de Pub/Sub et les écrit dans une table BigQuery. Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) écrite en JavaScript pour traiter les messages entrants.

Conditions requises pour ce pipeline

  • La table BigQuery doit exister et posséder un schéma.
  • Les données de messages Pub/Sub doivent utiliser le format JSON, ou vous devez fournir une fonction définie par l'utilisateur qui convertit les données du message au format JSON. Les données JSON doivent correspondre au schéma de la table BigQuery. Par exemple, si les charges utiles JSON sont au format {"k1":"v1", "k2":"v2"}, la table BigQuery doit comporter deux colonnes de chaîne nommées k1 et k2.
  • Spécifiez le paramètre inputSubscription ou inputTopic, mais pas les deux.

Paramètres de modèle

Paramètres obligatoires

  • outputTableSpec: table BigQuery dans laquelle écrire, au format PROJECT_ID:DATASET_NAME.TABLE_NAME.

Paramètres facultatifs

  • inputTopic: sujet Pub/Sub à lire, au format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: abonnement Pub/Sub à lire, au format projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: table BigQuery à utiliser pour les messages qui n'ont pas pu atteindre la table de sortie, au format PROJECT_ID:DATASET_NAME.TABLE_NAME. Si la table n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, la valeur OUTPUT_TABLE_SPEC_error_records est utilisée à la place.
  • useStorageWriteApiAtLeastOnce: spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur "true". Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
  • useStorageWriteApi: si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec: spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
  • javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
  • Exécuter le modèle

    1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
    2. Accéder à la page Créer un job à partir d'un modèle
    3. Dans le champ Nom du job, saisissez un nom de job unique.
    4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

      Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

    5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to BigQuery template.
    6. Dans les champs fournis, saisissez vos valeurs de paramètres.
    7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
    8. Cliquez sur Run Job (Exécuter la tâche).

    Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Remplacez les éléments suivants :

    • JOB_NAME : nom de job unique de votre choix
    • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • TOPIC_NAME : nom de votre sujet Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.

    Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • TOPIC_NAME : nom de votre sujet Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.ResourceUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.base.Strings;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.Row;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery",
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName",
          },
          flexContainerName = "pubsub-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true),
      @Template(
          name = "PubSub_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery with Python UDFs",
          type = Template.TemplateType.XLANG,
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          flexContainerName = "pubsub-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true)
    })
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options
          extends PipelineOptions,
              BigQueryStorageApiStreamingOptions,
              PythonExternalTextTransformerOptions,
              DataflowPipelineWorkerPoolOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            groupName = "Target",
            description = "BigQuery output table",
            helpText =
                "The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`.")
        String getOutputTableSpec();
    
        void setOutputTableSpec(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            groupName = "Source",
            optional = true,
            description = "Input Pub/Sub topic",
            helpText =
                "The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
        String getInputTopic();
    
        void setInputTopic(String value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            groupName = "Source",
            optional = true,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub subscription to read from, "
                    + "formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`.")
        String getInputSubscription();
    
        void setInputSubscription(String value);
    
        @TemplateParameter.BigQueryTable(
            order = 4,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that failed to reach the output table, "
                    + "formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "
                    + "doesn't exist, it is created when the pipeline runs. "
                    + "If this parameter is not specified, "
                    + "the value `OUTPUT_TABLE_SPEC_error_records` is used instead.")
        String getOutputDeadletterTable();
    
        void setOutputDeadletterTable(String value);
    
        @TemplateParameter.Boolean(
            order = 5,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. "
                    + "To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"
                    + ", set this parameter to true. "
                    + "To use exactly-once semantics, set the parameter to `false`. "
                    + "This parameter applies only when `useStorageWriteApi` is `true`. "
                    + "The default value is `false`.")
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        //    options.setWorkerDiskType(
        //
        // "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
        boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
        if (useInputSubscription == useInputTopic) {
          throw new IllegalArgumentException(
              "Either input topic or input subscription must be provided, but not both.");
        }
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (useInputSubscription) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                            ? options.getOutputDeadletterTable()
                            : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                        ? options.getOutputDeadletterTable()
                        : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
          boolean useJavascriptUdf =
              !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
          boolean usePythonUdf =
              !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
          if (useJavascriptUdf && usePythonUdf) {
            throw new IllegalArgumentException(
                "Either javascript or Python gcs path must be provided, but not both.");
          }
          PCollectionTuple udfOut;
          if (usePythonUdf) {
            PCollection<Row> udfRowsOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply(
                        "MapToRecord",
                        PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                            .pubSubMappingFunction())
                    .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
                    .apply(
                        "InvokeUDF",
                        PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                            .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                            .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                            .build());
            udfOut =
                udfRowsOut.apply(
                    "MapRowsToFailsafeElements",
                    ParDo.of(new RowToPubSubFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
                        .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
          } else {
            udfOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                    .apply(
                        "InvokeUDF",
                        FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                            .setFunctionName(options.getJavascriptTextTransformFunctionName())
                            .setReloadIntervalMinutes(
                                options.getJavascriptTextTransformReloadIntervalMinutes())
                            .setSuccessTag(UDF_OUT)
                            .setFailureTag(UDF_DEADLETTER_OUT)
                            .build());
          }
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    Étape suivante