Modello Pub/Sub a BigQuery

Il modello Pub/Sub to BigQuery è una pipeline in modalità flusso che legge i messaggi in formato JSON da Pub/Sub e li scrive in una tabella BigQuery. Se vuoi, puoi fornire una funzione definita dall'utente (UDF) scritta in JavaScript per elaborare i messaggi in arrivo.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON oppure devi fornire una UDF che li converta in JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di stringhe denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametri obbligatori

  • outputTableSpec: la tabella BigQuery in cui scrivere, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME.

Parametri facoltativi

  • inputTopic: l'argomento Pub/Sub da cui leggere, formattato come projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: la sottoscrizione Pub/Sub da cui leggere, formattata come projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: la tabella BigQuery da utilizzare per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME. Se la tabella non esiste, viene creata quando viene eseguita la pipeline. Se questo parametro non è specificato, viene utilizzato il valore OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica esattamente una volta, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disattivato. Il valore predefinito è 0.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha la seguente specifica:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

    5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Sostituisci quanto segue:

    • JOB_NAME: un nome di job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome di job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.ResourceUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.base.Strings;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.Row;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery",
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName",
          },
          flexContainerName = "pubsub-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true),
      @Template(
          name = "PubSub_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery with Python UDFs",
          type = Template.TemplateType.XLANG,
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          flexContainerName = "pubsub-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true)
    })
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options
          extends PipelineOptions,
              BigQueryStorageApiStreamingOptions,
              PythonExternalTextTransformerOptions,
              DataflowPipelineWorkerPoolOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            groupName = "Target",
            description = "BigQuery output table",
            helpText =
                "The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`.")
        String getOutputTableSpec();
    
        void setOutputTableSpec(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            groupName = "Source",
            optional = true,
            description = "Input Pub/Sub topic",
            helpText =
                "The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
        String getInputTopic();
    
        void setInputTopic(String value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            groupName = "Source",
            optional = true,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub subscription to read from, "
                    + "formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`.")
        String getInputSubscription();
    
        void setInputSubscription(String value);
    
        @TemplateParameter.BigQueryTable(
            order = 4,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that failed to reach the output table, "
                    + "formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "
                    + "doesn't exist, it is created when the pipeline runs. "
                    + "If this parameter is not specified, "
                    + "the value `OUTPUT_TABLE_SPEC_error_records` is used instead.")
        String getOutputDeadletterTable();
    
        void setOutputDeadletterTable(String value);
    
        @TemplateParameter.Boolean(
            order = 5,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. "
                    + "To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"
                    + ", set this parameter to true. "
                    + "To use exactly-once semantics, set the parameter to `false`. "
                    + "This parameter applies only when `useStorageWriteApi` is `true`. "
                    + "The default value is `false`.")
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        //    options.setWorkerDiskType(
        //
        // "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
        boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
        if (useInputSubscription == useInputTopic) {
          throw new IllegalArgumentException(
              "Either input topic or input subscription must be provided, but not both.");
        }
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (useInputSubscription) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                            ? options.getOutputDeadletterTable()
                            : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                        ? options.getOutputDeadletterTable()
                        : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
          boolean useJavascriptUdf =
              !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
          boolean usePythonUdf =
              !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
          if (useJavascriptUdf && usePythonUdf) {
            throw new IllegalArgumentException(
                "Either javascript or Python gcs path must be provided, but not both.");
          }
          PCollectionTuple udfOut;
          if (usePythonUdf) {
            PCollection<Row> udfRowsOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply(
                        "MapToRecord",
                        PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                            .pubSubMappingFunction())
                    .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
                    .apply(
                        "InvokeUDF",
                        PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                            .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                            .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                            .build());
            udfOut =
                udfRowsOut.apply(
                    "MapRowsToFailsafeElements",
                    ParDo.of(new RowToPubSubFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
                        .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
          } else {
            udfOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                    .apply(
                        "InvokeUDF",
                        FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                            .setFunctionName(options.getJavascriptTextTransformFunctionName())
                            .setReloadIntervalMinutes(
                                options.getJavascriptTextTransformReloadIntervalMinutes())
                            .setSuccessTag(UDF_OUT)
                            .setFailureTag(UDF_DEADLETTER_OUT)
                            .build());
          }
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    Passaggi successivi