Plantilla de suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos de la canalización

  • El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campo data con formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetros obligatorios

  • outputTableSpec: La ubicación de la tabla de resultados de BigQuery, en el formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription: La suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parámetros opcionales

  • outputDeadletterTable: La tabla de BigQuery que se usará para los mensajes que no llegan a la tabla de salida, en el formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Si la tabla no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records.
  • javascriptTextTransformGcsPath: El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Define el intervalo que los trabajadores pueden verificar para detectar cambios en la UDF de JavaScript a fin de volver a cargar los archivos. La configuración predeterminada es 0.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Subscription to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Reemplaza lo siguiente:

    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
    • DATASET: Es el conjunto de datos de BigQuery.
    • TABLE_NAME: Es el nombre de la tabla de BigQuery.

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
    • DATASET: Es el conjunto de datos de BigQuery.
    • TABLE_NAME: Es el nombre de la tabla de BigQuery.
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.templates;
    
    import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateCreationParameter;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.templates.common.ErrorConverters;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
    import com.google.cloud.teleport.util.ResourceUtils;
    import com.google.cloud.teleport.util.ValueProviderUtils;
    import com.google.cloud.teleport.values.FailsafeElement;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.ValueProvider;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SerializableFunction;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_Subscription_to_BigQuery.md">README
     * for Subscription</a> or <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_to_BigQuery.md">README
     * for Topic</a> for instructions on how to use or modify this template.
     */
    @Template(
        name = "PubSub_Subscription_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Subscription to BigQuery",
        description =
            "The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputTopic",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        streaming = true,
        supportsAtLeastOnce = true,
        supportsExactlyOnce = true)
    @Template(
        name = "PubSub_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Topic to BigQuery",
        description =
            "The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputSubscription",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        hidden = true,
        streaming = true,
        supportsAtLeastOnce = true)
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            description = "BigQuery output table",
            helpText =
                "The BigQuery output table location, in the format `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`")
        ValueProvider<String> getOutputTableSpec();
    
        void setOutputTableSpec(ValueProvider<String> value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            description = "Input Pub/Sub topic",
            helpText = "The Pub/Sub topic to read the input from.")
        ValueProvider<String> getInputTopic();
    
        void setInputTopic(ValueProvider<String> value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub input subscription to read from, in the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>`.")
        ValueProvider<String> getInputSubscription();
    
        void setInputSubscription(ValueProvider<String> value);
    
        @TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
        @TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
        @Description(
            "This determines whether the template reads from a Pub/sub subscription or a topic")
        @Default.Boolean(false)
        Boolean getUseSubscription();
    
        void setUseSubscription(Boolean value);
    
        @TemplateParameter.BigQueryTable(
            order = 5,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that fail to reach the output table, in the format of `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`. If the table doesn't exist, it is created during pipeline execution. If not specified, `OUTPUT_TABLE_SPEC_error_records` is used.")
        ValueProvider<String> getOutputDeadletterTable();
    
        void setOutputDeadletterTable(ValueProvider<String> value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (options.getUseSubscription()) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            writeResult
                .getFailedInsertsWithErr()
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        ValueProviderUtils.maybeUseDefaultDeadletterTable(
                            options.getOutputDeadletterTable(),
                            options.getOutputTableSpec(),
                            DEFAULT_DEADLETTER_TABLE_SUFFIX))
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    ValueProviderUtils.maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
       * defaultDeadLetterTableSuffix is returned instead.
       */
      private static ValueProvider<String> maybeUseDefaultDeadletterTable(
          ValueProvider<String> deadletterTable,
          ValueProvider<String> outputTableSpec,
          String defaultDeadLetterTableSuffix) {
        return DualInputNestedValueProvider.of(
            deadletterTable,
            outputTableSpec,
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                String userProvidedTable = input.getX();
                String outputTableSpec = input.getY();
                if (userProvidedTable == null) {
                  return outputTableSpec + defaultDeadLetterTableSuffix;
                }
                return userProvidedTable;
              }
            });
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
    
          PCollectionTuple udfOut =
              input
                  // Map the incoming messages into FailsafeElements so we can recover from failures
                  // across multiple transforms.
                  .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                  .apply(
                      "InvokeUDF",
                      FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                          .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                          .setFunctionName(options.getJavascriptTextTransformFunctionName())
                          .setReloadIntervalMinutes(
                              options.getJavascriptTextTransformReloadIntervalMinutes())
                          .setSuccessTag(UDF_OUT)
                          .setFailureTag(UDF_DEADLETTER_OUT)
                          .build());
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    ¿Qué sigue?