Modelo de assinatura do Pub/Sub para BigQuery

O modelo de assinatura do Pub/Sub para BigQuery é um pipeline de streaming que lê mensagens formatadas em JSON de uma assinatura de Pub/Sub e as grava em uma tabela do BigQuery. É possível usar o modelo como uma solução rápida para mover dados do Pub/Sub para BigQuery. O modelo lê mensagens em formato JSON do Pub/Sub e as converte em elementos do BigQuery.

Requisitos de pipeline

  • O campo data das mensagens do Pub/Sub precisa usar o formato JSON, conforme descrito neste guia JSON. Por exemplo, mensagens com valores no campo data formatados como {"k1":"v1", "k2":"v2"} podem ser inseridos em uma tabela do BigQuery com duas colunas, k1 e k2, com um tipo de dados de string.
  • O diretório de saída precisa ser criado antes de executar o pipeline. O esquema da tabela precisa corresponder aos objetos JSON de entrada.

Parâmetros do modelo

Parâmetros obrigatórios

  • outputTableSpec: o local da tabela de saída do BigQuery, no formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription: a assinatura de entrada do Pub/Sub que será lida, no formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parâmetros opcionais

  • outputDeadletterTable: a tabela do BigQuery a ser usada para mensagens que não alcançaram a tabela de saída, no formato de <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Se a tabela não existir, ela será criada durante a execução do pipeline. Se não for especificado, OUTPUT_TABLE_SPEC_error_records será usado.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: defina o intervalo que os workers podem verificar se há alterações na UDF em JavaScript para recarregar os arquivos. Padrão: 0.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
  • Executar o modelo

    1. Acesse a página Criar job usando um modelo do Dataflow.
    2. Acesse Criar job usando um modelo
    3. No campo Nome do job, insira um nome exclusivo.
    4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

      Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

    5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Subscription to BigQuery template.
    6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
    7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
    8. Cliquem em Executar job.

    No shell ou no terminal, execute o modelo:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Substitua:

    • JOB_NAME: um nome de job de sua escolha
    • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery

    Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • JOB_NAME: um nome de job de sua escolha
    • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que você quer usar

      Use estes valores:

    • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
    • DATASET: o conjunto de dados do BigQuery
    • TABLE_NAME: o nome da tabela do BigQuery
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.templates;
    
    import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateCreationParameter;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.templates.common.ErrorConverters;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
    import com.google.cloud.teleport.util.ResourceUtils;
    import com.google.cloud.teleport.util.ValueProviderUtils;
    import com.google.cloud.teleport.values.FailsafeElement;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.ValueProvider;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SerializableFunction;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_Subscription_to_BigQuery.md">README
     * for Subscription</a> or <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_to_BigQuery.md">README
     * for Topic</a> for instructions on how to use or modify this template.
     */
    @Template(
        name = "PubSub_Subscription_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Subscription to BigQuery",
        description =
            "The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputTopic",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        streaming = true,
        supportsAtLeastOnce = true,
        supportsExactlyOnce = true)
    @Template(
        name = "PubSub_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Topic to BigQuery",
        description =
            "The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputSubscription",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        hidden = true,
        streaming = true,
        supportsAtLeastOnce = true)
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            description = "BigQuery output table",
            helpText =
                "The BigQuery output table location, in the format `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`")
        ValueProvider<String> getOutputTableSpec();
    
        void setOutputTableSpec(ValueProvider<String> value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            description = "Input Pub/Sub topic",
            helpText = "The Pub/Sub topic to read the input from.")
        ValueProvider<String> getInputTopic();
    
        void setInputTopic(ValueProvider<String> value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub input subscription to read from, in the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>`.")
        ValueProvider<String> getInputSubscription();
    
        void setInputSubscription(ValueProvider<String> value);
    
        @TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
        @TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
        @Description(
            "This determines whether the template reads from a Pub/sub subscription or a topic")
        @Default.Boolean(false)
        Boolean getUseSubscription();
    
        void setUseSubscription(Boolean value);
    
        @TemplateParameter.BigQueryTable(
            order = 5,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that fail to reach the output table, in the format of `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`. If the table doesn't exist, it is created during pipeline execution. If not specified, `OUTPUT_TABLE_SPEC_error_records` is used.")
        ValueProvider<String> getOutputDeadletterTable();
    
        void setOutputDeadletterTable(ValueProvider<String> value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (options.getUseSubscription()) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            writeResult
                .getFailedInsertsWithErr()
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        ValueProviderUtils.maybeUseDefaultDeadletterTable(
                            options.getOutputDeadletterTable(),
                            options.getOutputTableSpec(),
                            DEFAULT_DEADLETTER_TABLE_SUFFIX))
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    ValueProviderUtils.maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
       * defaultDeadLetterTableSuffix is returned instead.
       */
      private static ValueProvider<String> maybeUseDefaultDeadletterTable(
          ValueProvider<String> deadletterTable,
          ValueProvider<String> outputTableSpec,
          String defaultDeadLetterTableSuffix) {
        return DualInputNestedValueProvider.of(
            deadletterTable,
            outputTableSpec,
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                String userProvidedTable = input.getX();
                String outputTableSpec = input.getY();
                if (userProvidedTable == null) {
                  return outputTableSpec + defaultDeadLetterTableSuffix;
                }
                return userProvidedTable;
              }
            });
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
    
          PCollectionTuple udfOut =
              input
                  // Map the incoming messages into FailsafeElements so we can recover from failures
                  // across multiple transforms.
                  .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                  .apply(
                      "InvokeUDF",
                      FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                          .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                          .setFunctionName(options.getJavascriptTextTransformFunctionName())
                          .setReloadIntervalMinutes(
                              options.getJavascriptTextTransformReloadIntervalMinutes())
                          .setSuccessTag(UDF_OUT)
                          .setFailureTag(UDF_DEADLETTER_OUT)
                          .build());
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    A seguir