Vorlage "Pub/Sub Proto für BigQuery mit Python"

Die Vorlage „Pub/Sub Proto für BigQuery“ ist eine Streamingpipeline, die Proto-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Sie können eine benutzerdefinierte Funktion (User-defined Function, UDF) zum Transformieren von Daten bereitstellen. Fehler während der Ausführung der UDF können entweder an ein separates Pub/Sub-Thema oder an dasselbe nicht verarbeitete Thema wie die BigQuery-Fehler gesendet werden.


  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Proto-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Ausgabethema muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
  • Wenn die BigQuery-Tabelle vorhanden ist, muss sie ein Schema haben, das mit den Proto-Daten unabhängig vom createDisposition-Wert übereinstimmt.


Parameter Beschreibung
protoSchemaPath Der Cloud Storage-Speicherort der eigenständigen Proto-Schemadatei. z. B. gs://path/to/my/file.pb. Diese Datei kann mit dem Flag --descriptor_set_out des Befehls protoc generiert werden. Das Flag --include_imports garantiert, dass die Datei unabhängig ist.
fullMessageName Der vollständige Proto-Nachrichtenname. Beispiel: package.name.MessageName, wobei package.name der Wert für die Anweisung package und nicht für die Anweisung java_package ist.
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. z. B. my-project:my_dataset.my_table. Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit der Eingabeschemadatei erstellt werden.
preserveProtoFieldNames Optional: true, um den ursprünglichen Proto-Feldnamen in JSON beizubehalten. false, um weitere JSON-Standardnamen zu verwenden. Zum Beispiel würde false field_name in fieldName ändern. (Standard: false)
bigQueryTableSchemaPath Optional: Cloud Storage-Pfad zum BigQuery-Schemapfad. Beispiel: gs://path/to/my/schema.json. Falls nicht angegeben ist, wird das Schema aus dem Proto-Schema abgeleitet.
pythonExternalTextTransformGcsPath Optional: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Optional: Der Name der benutzerdefinierten Python-Funktion, die Sie verwenden möchten.
udfOutputTopic Optional: Das Pub/Sub-Thema, in dem die UDF-Fehler gespeichert werden. z. B. projects/<project-id>/topics/<topic-name> Wenn nicht angegeben, werden UDF-Fehler an dasselbe Thema wie outputTopic gesendet.
writeDisposition Optional: Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standard: WRITE_APPEND.
createDisposition Optional: Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standard: CREATE_IF_NEEDED.
useStorageWriteApi Optional: Wenn true, verwendet die Pipeline die BigQuery Storage Write API. Der Standardwert ist false. Weitere Informationen finden Sie unter BigQuery Storage Write API verwenden.
useStorageWriteApiAtLeastOnce Optional: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie "Mindestens einmal"-Semantik verwenden, legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
numStorageWriteApiStreams Optional: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
storageWriteApiTriggeringFrequencySec Optional: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.


UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
  • Führen Sie die Vorlage aus.

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Proto to BigQuery with Python UDF templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Klicken Sie auf Job ausführen.

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \

    Ersetzen Sie dabei Folgendes:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
    • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
    • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
    • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
    • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
    • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
    • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
     * Copyright (C) 2021 Google LLC
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *   http://www.apache.org/licenses/LICENSE-2.0
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
    package com.google.cloud.teleport.v2.templates;
    import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction;
    import static java.nio.charset.StandardCharsets.UTF_8;
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
    import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.GCSUtils;
    import com.google.cloud.teleport.v2.utils.SchemaUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Strings;
    import com.google.protobuf.Descriptors.Descriptor;
    import com.google.protobuf.DynamicMessage;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.util.JsonFormat;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.NullableCoder;
    import org.apache.beam.sdk.coders.RowCoder;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.apache.beam.sdk.values.TypeDescriptor;
    import org.apache.beam.sdk.values.TypeDescriptors;
    import org.apache.commons.lang3.ArrayUtils;
     * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
     * records from Pub/Sub to BigQuery.
     * <p>Persistent failures are written to a Pub/Sub unprocessed topic.
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
          name = "PubSub_Proto_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery",
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A JavaScript user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          skipOptions = {
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery",
          documentation =
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          streaming = true,
          supportsAtLeastOnce = true),
          name = "PubSub_Proto_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery with Python UDF",
          type = Template.TemplateType.XLANG,
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A Python user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          skipOptions = {
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery-xlang",
          documentation =
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          streaming = true,
          supportsAtLeastOnce = true)
    public final class PubsubProtoToBigQuery {
      private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
      private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
      private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
      public static void main(String[] args) {
      /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
      public interface PubSubProtoToBigQueryOptions
          extends ReadSubscriptionOptions,
              BigQueryStorageApiStreamingOptions {
            order = 1,
            description = "Cloud Storage Path to the Proto Schema File",
            helpText =
                "The Cloud Storage location of the self-contained proto schema file. For example,"
                    + " `gs://path/to/my/file.pb`. You can generate this file with"
                    + " the `--descriptor_set_out` flag of the protoc command."
                    + " The `--include_imports` flag guarantees that the file is self-contained.")
        String getProtoSchemaPath();
        void setProtoSchemaPath(String value);
            order = 2,
            regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
            description = "Full Proto Message Name",
            helpText =
                "The full proto message name. For example, `package.name`."
                    + " `MessageName`, where `package.name` is the value provided for the"
                    + " `package` statement and not the `java_package` statement.")
        String getFullMessageName();
        void setFullMessageName(String value);
            order = 3,
            optional = true,
            description = "Preserve Proto Field Names",
            helpText =
                "To preserve the original proto field name in JSON, set this property to `true`. "
                    + "To use more standard JSON names, set to `false`."
                    + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.")
        Boolean getPreserveProtoFieldNames();
        void setPreserveProtoFieldNames(Boolean value);
            order = 4,
            optional = true,
            description = "BigQuery Table Schema Path",
            helpText =
                "The Cloud Storage path to the BigQuery schema path. "
                    + "If this value isn't provided, then the schema is inferred from the Proto schema.",
            example = "gs://MyBucket/bq_schema.json")
        String getBigQueryTableSchemaPath();
        void setBigQueryTableSchemaPath(String value);
            order = 5,
            optional = true,
            description = "Pub/Sub output topic for UDF failures",
            helpText =
                "The Pub/Sub topic storing the UDF errors."
                    + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",
            example = "projects/your-project-id/topics/your-topic-name")
        String getUdfOutputTopic();
        void setUdfOutputTopic(String udfOutputTopic);
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
            order = 6,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics."
                    + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."
                    + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        Boolean getUseStorageWriteApiAtLeastOnce();
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      /** Runs the pipeline and returns the results. */
      private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
        Pipeline pipeline = Pipeline.create(options);
        Descriptor descriptor = getDescriptor(options);
        PCollection<String> maybeForUdf =
                .apply("Read From Pubsub", readPubsubMessages(options, descriptor))
                .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
        WriteResult writeResult =
            runUdf(maybeForUdf, options)
                .apply("Write to BigQuery", writeToBigQuery(options, descriptor));
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
                "Create Error Payload",
            .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
        return pipeline.run();
      /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
      static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
        String schemaPath = options.getProtoSchemaPath();
        String messageName = options.getFullMessageName();
        Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
        if (descriptor == null) {
          throw new IllegalArgumentException(
              messageName + " is not a recognized message in " + schemaPath);
        return descriptor;
      /** Returns the {@link PTransform} for reading Pub/Sub messages. */
      private static Read<DynamicMessage> readPubsubMessages(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        return PubsubIO.readProtoDynamicMessages(descriptor)
       * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
       * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
       * specified in {@code options}.
      static Write<String> writeToBigQuery(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        Write<String> write =
        String schemaPath = options.getBigQueryTableSchemaPath();
        if (Strings.isNullOrEmpty(schemaPath)) {
          return write.withSchema(
              SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
        } else {
          return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
      /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
      private static class ConvertDynamicProtoMessageToJson
          extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
        private final boolean preserveProtoName;
        private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
          this.preserveProtoName = options.getPreserveProtoFieldNames();
        public PCollection<String> expand(PCollection<DynamicMessage> input) {
          return input.apply(
              "Map to JSON",
                      message -> {
                        try {
                          JsonFormat.Printer printer = JsonFormat.printer();
                          return preserveProtoName
                              ? printer.preservingProtoFieldNames().print(message)
                              : printer.print(message);
                        } catch (InvalidProtocolBufferException e) {
                          throw new RuntimeException(e);
       * Handles running the UDF.
       * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
       * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
       * topic.
       * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
       * @param options the options containing info on running the UDF
       * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
       *     called
      static PCollection<String> runUdf(
          PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
        boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
        boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
        // In order to avoid generating a graph that makes it look like a UDF was called when none was
        // intended, simply return the input as "success" output.
        if (!useJavascriptUdf && !usePythonUdf) {
          return jsonCollection;
        // For testing purposes, we need to do this check before creating the PTransform rather than
        // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
        // a value.
        if (useJavascriptUdf
            && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "JavaScript function name cannot be null or empty if file is set");
        if (usePythonUdf
            && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "Python function name cannot be null or empty if file is set");
        if (usePythonUdf && useJavascriptUdf) {
          throw new IllegalArgumentException(
              "Either javascript or Python gcs path must be provided, but not both.");
        PCollectionTuple maybeSuccess;
        if (usePythonUdf) {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));
        } else {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
                "Get UDF Failures",
                ConvertFailsafeElementToPubsubMessage.<String, String>builder()
                    .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
            .apply("Write Failed UDF", writeUdfFailures(options));
        return maybeSuccess
                "Get UDF Output",
      /** {@link PTransform} that calls a UDF and returns both success and failure output. */
      private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
        RunUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe UDF", makeFailsafe())
                  "Call UDF",
        private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
          return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
              .via((String json) -> FailsafeElement.of(json, json));
      /** {@link PTransform} that calls a python UDF and returns both success and failure output. */
      private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
        RunPythonUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe row", stringMappingFunction())
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))
                      .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));
       * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
       * topic.
      private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
          PubSubProtoToBigQueryOptions options) {
        PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
        return Strings.isNullOrEmpty(options.getUdfOutputTopic())
            ? write.to(options.getOutputTopic())
            : write.to(options.getUdfOutputTopic());

