Plantilla de Proto de Pub/Sub a BigQuery con UDF de Python

La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Se puede proporcionar una función definida por el usuario (UDF) de Python para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.

Requisitos de la canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema de los registros proto debe existir en Cloud Storage.
  • El tema de Pub/Sub de salida debe existir.
  • El conjunto de datos de salida de BigQuery debe existir.
  • Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor createDisposition.

Parámetros de la plantilla

Parámetro Descripción
protoSchemaPath La ubicación de Cloud Storage del archivo de esquema proto autónomo. Por ejemplo, gs://path/to/my/file.pb. Este archivo se puede generar con la marca --descriptor_set_out del comando protoc. La marca --include_imports garantiza que el archivo sea autónomo.
fullMessageName El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName, en el que package.name es el valor proporcionado para la declaración package y no la declaración java_package.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el archivo de esquema de entrada.
preserveProtoFieldNames Opcional: true para conservar el nombre del campo Proto original en JSON. false para usar nombres JSON más estándar. Por ejemplo, false cambiaría field_name a fieldName. (Default: false)
bigQueryTableSchemaPath Opcional: Ruta de Cloud Storage a la ruta del esquema de BigQuery. Por ejemplo, gs://path/to/my/schema.json. Si no se proporciona, entonces el esquema se infiere a partir del esquema Proto.
pythonExternalTextTransformGcsPath Opcional: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opcional: El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
udfOutputTopic Opcional: El tema de Pub/Sub que almacena los errores de las UDF. Por ejemplo, projects/<project-id>/topics/<topic-name>. Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic.
writeDisposition Opcional: El elemento WriteDisposition de BigQuery. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Valor predeterminado: WRITE_APPEND.
createDisposition Opcional: El elemento CreateDisposition de BigQuery. Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Configuración predeterminada: CREATE_IF_NEEDED.
useStorageWriteApi Opcional: Si es true, la canalización usa la API de BigQuery Storage Write. El valor predeterminado es false. Para obtener más información, consulta Usa la API de BigQuery Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Cuando usas la API de Storage Write, se especifica la semántica de escritura (opcional). Para usar una semántica de al menos una vez, establece este parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
numStorageWriteApiStreams Opcional: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.
storageWriteApiTriggeringFrequencySec Opcional: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Proto to BigQuery with Python UDF template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Haga clic en Ejecutar trabajo.

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Reemplaza lo siguiente:

    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
    • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
    • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
    • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
    • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
    • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
    Java
    /*
     * Copyright (C) 2021 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
    import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.GCSUtils;
    import com.google.cloud.teleport.v2.utils.SchemaUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Strings;
    import com.google.protobuf.Descriptors.Descriptor;
    import com.google.protobuf.DynamicMessage;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.util.JsonFormat;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.NullableCoder;
    import org.apache.beam.sdk.coders.RowCoder;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.apache.beam.sdk.values.TypeDescriptor;
    import org.apache.beam.sdk.values.TypeDescriptors;
    import org.apache.commons.lang3.ArrayUtils;
    
    /**
     * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
     * records from Pub/Sub to BigQuery.
     *
     * <p>Persistent failures are written to a Pub/Sub unprocessed topic.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_Proto_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery",
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A JavaScript user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true),
      @Template(
          name = "PubSub_Proto_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery with Python UDF",
          type = Template.TemplateType.XLANG,
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A Python user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true)
    })
    public final class PubsubProtoToBigQuery {
      private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
      private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
    
      private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
        run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
      }
    
      /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
      public interface PubSubProtoToBigQueryOptions
          extends ReadSubscriptionOptions,
              WriteOptions,
              WriteTopicOptions,
              PythonExternalTextTransformer.PythonExternalTextTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        @TemplateParameter.GcsReadFile(
            order = 1,
            description = "Cloud Storage Path to the Proto Schema File",
            helpText =
                "The Cloud Storage location of the self-contained proto schema file. For example,"
                    + " `gs://path/to/my/file.pb`. You can generate this file with"
                    + " the `--descriptor_set_out` flag of the protoc command."
                    + " The `--include_imports` flag guarantees that the file is self-contained.")
        @Required
        String getProtoSchemaPath();
    
        void setProtoSchemaPath(String value);
    
        @TemplateParameter.Text(
            order = 2,
            regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
            description = "Full Proto Message Name",
            helpText =
                "The full proto message name. For example, `package.name`."
                    + " `MessageName`, where `package.name` is the value provided for the"
                    + " `package` statement and not the `java_package` statement.")
        @Required
        String getFullMessageName();
    
        void setFullMessageName(String value);
    
        @TemplateParameter.Boolean(
            order = 3,
            optional = true,
            description = "Preserve Proto Field Names",
            helpText =
                "To preserve the original proto field name in JSON, set this property to `true`. "
                    + "To use more standard JSON names, set to `false`."
                    + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.")
        @Default.Boolean(false)
        Boolean getPreserveProtoFieldNames();
    
        void setPreserveProtoFieldNames(Boolean value);
    
        @TemplateParameter.GcsReadFile(
            order = 4,
            optional = true,
            description = "BigQuery Table Schema Path",
            helpText =
                "The Cloud Storage path to the BigQuery schema path. "
                    + "If this value isn't provided, then the schema is inferred from the Proto schema.",
            example = "gs://MyBucket/bq_schema.json")
        String getBigQueryTableSchemaPath();
    
        void setBigQueryTableSchemaPath(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 5,
            optional = true,
            description = "Pub/Sub output topic for UDF failures",
            helpText =
                "The Pub/Sub topic storing the UDF errors."
                    + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",
            example = "projects/your-project-id/topics/your-topic-name")
        String getUdfOutputTopic();
    
        void setUdfOutputTopic(String udfOutputTopic);
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 6,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics."
                    + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."
                    + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** Runs the pipeline and returns the results. */
      private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    
        Pipeline pipeline = Pipeline.create(options);
    
        Descriptor descriptor = getDescriptor(options);
        PCollection<String> maybeForUdf =
            pipeline
                .apply("Read From Pubsub", readPubsubMessages(options, descriptor))
                .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
    
        WriteResult writeResult =
            runUdf(maybeForUdf, options)
                .apply("Write to BigQuery", writeToBigQuery(options, descriptor));
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Create Error Payload",
                ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
                    .setPayloadCoder(StringUtf8Coder.of())
                    .setTranslateFunction(BigQueryConverters::tableRowToJson)
                    .build())
            .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
    
        return pipeline.run();
      }
    
      /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
      @VisibleForTesting
      static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
        String schemaPath = options.getProtoSchemaPath();
        String messageName = options.getFullMessageName();
        Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
    
        if (descriptor == null) {
          throw new IllegalArgumentException(
              messageName + " is not a recognized message in " + schemaPath);
        }
    
        return descriptor;
      }
    
      /** Returns the {@link PTransform} for reading Pub/Sub messages. */
      private static Read<DynamicMessage> readPubsubMessages(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        return PubsubIO.readProtoDynamicMessages(descriptor)
            .fromSubscription(options.getInputSubscription())
            .withDeadLetterTopic(options.getOutputTopic());
      }
    
      /**
       * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
       *
       * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
       * specified in {@code options}.
       */
      @VisibleForTesting
      static Write<String> writeToBigQuery(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        Write<String> write =
            BigQueryConverters.<String>createWriteTransform(options)
                .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    
        String schemaPath = options.getBigQueryTableSchemaPath();
        if (Strings.isNullOrEmpty(schemaPath)) {
          return write.withSchema(
              SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
        } else {
          return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
        }
      }
    
      /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
      private static class ConvertDynamicProtoMessageToJson
          extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
        private final boolean preserveProtoName;
    
        private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
          this.preserveProtoName = options.getPreserveProtoFieldNames();
        }
    
        @Override
        public PCollection<String> expand(PCollection<DynamicMessage> input) {
          return input.apply(
              "Map to JSON",
              MapElements.into(TypeDescriptors.strings())
                  .via(
                      message -> {
                        try {
                          JsonFormat.Printer printer = JsonFormat.printer();
                          return preserveProtoName
                              ? printer.preservingProtoFieldNames().print(message)
                              : printer.print(message);
                        } catch (InvalidProtocolBufferException e) {
                          throw new RuntimeException(e);
                        }
                      }));
        }
      }
    
      /**
       * Handles running the UDF.
       *
       * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
       *
       * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
       * topic.
       *
       * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
       * @param options the options containing info on running the UDF
       * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
       *     called
       */
      @VisibleForTesting
      static PCollection<String> runUdf(
          PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
    
        boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
        boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    
        // In order to avoid generating a graph that makes it look like a UDF was called when none was
        // intended, simply return the input as "success" output.
        if (!useJavascriptUdf && !usePythonUdf) {
          return jsonCollection;
        }
    
        // For testing purposes, we need to do this check before creating the PTransform rather than
        // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
        // a value.
        if (useJavascriptUdf
            && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "JavaScript function name cannot be null or empty if file is set");
        }
        if (usePythonUdf
            && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "Python function name cannot be null or empty if file is set");
        }
        if (usePythonUdf && useJavascriptUdf) {
          throw new IllegalArgumentException(
              "Either javascript or Python gcs path must be provided, but not both.");
        }
    
        PCollectionTuple maybeSuccess;
        if (usePythonUdf) {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));
        } else {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
        }
    
        maybeSuccess
            .get(UDF_FAILURE_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Failures",
                ConvertFailsafeElementToPubsubMessage.<String, String>builder()
                    .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
                    .setErrorMessageAttributeKey("udfErrorMessage")
                    .build())
            .apply("Write Failed UDF", writeUdfFailures(options));
    
        return maybeSuccess
            .get(UDF_SUCCESS_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Output",
                MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
            .setCoder(NullableCoder.of(StringUtf8Coder.of()));
      }
    
      /** {@link PTransform} that calls a UDF and returns both success and failure output. */
      private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe UDF", makeFailsafe())
              .setCoder(FAILSAFE_CODER)
              .apply(
                  "Call UDF",
                  FailsafeJavascriptUdf.<String>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_SUCCESS_TAG)
                      .setFailureTag(UDF_FAILURE_TAG)
                      .build());
        }
    
        private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
          return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
              .via((String json) -> FailsafeElement.of(json, json));
        }
      }
    
      /** {@link PTransform} that calls a python UDF and returns both success and failure output. */
      private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunPythonUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe row", stringMappingFunction())
              .setCoder(
                  RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA))
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  "MapRowsToFailsafeElements",
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))
                      .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));
        }
      }
    
      /**
       * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
       * topic.
       */
      private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
          PubSubProtoToBigQueryOptions options) {
        PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
        return Strings.isNullOrEmpty(options.getUdfOutputTopic())
            ? write.to(options.getOutputTopic())
            : write.to(options.getUdfOutputTopic());
      }
    }
    

    ¿Qué sigue?