Vorlage "Pub/Sub für BigQuery"

Die Vorlage "Pub/Sub für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus Pub/Sub liest und in eine BigQuery-Tabelle schreibt. Optional können Sie eine in JavaScript geschriebene benutzerdefinierte Funktion (User-Defined Function, UDF) bereitstellen, um die eingehenden Nachrichten zu verarbeiten.

Pipelineanforderungen

  • Die BigQuery-Tabelle muss vorhanden sein und ein Schema haben.
  • Die Pub/Sub-Nachrichtendaten müssen das JSON-Format verwenden oder Sie müssen eine UDF bereitstellen, die die Nachrichtendaten in JSON konvertiert. Die JSON-Daten müssen mit dem BigQuery-Tabellenschema übereinstimmen. Wenn die JSON-Nutzlasten beispielsweise als {"k1":"v1", "k2":"v2"} formatiert sind, muss die BigQuery-Tabelle zwei Stringspalten mit den Namen k1 und k2 haben.
  • Geben Sie den Parameter inputSubscription oder inputTopic an, aber nicht beides.

Vorlagenparameter

Erforderliche Parameter

  • outputTableSpec: Die BigQuery-Tabelle, in die Daten geschrieben werden sollen, formatiert als PROJECT_ID:DATASET_NAME.TABLE_NAME.

Optionale Parameter

  • inputTopic: Das Pub/Sub-Thema, aus dem gelesen werden soll, formatiert als projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: Das Pub/Sub-Abo, aus dem gelesen werden soll, formatiert als projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: Die BigQuery-Tabelle, die für Nachrichten verwendet werden soll, die die Ausgabetabelle nicht erreicht haben, formatiert als PROJECT_ID:DATASET_NAME.TABLE_NAME. Wenn die Tabelle nicht vorhanden ist, wird sie beim Ausführen der Pipeline erstellt. Wenn dieser Parameter nicht angegeben ist, wird stattdessen der Wert OUTPUT_TABLE_SPEC_error_records verwendet.
  • useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die „Mindestens einmal“-Semantik verwenden (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf „true“ fest. Wenn Sie die „Genau einmal“-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
  • useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert 0 ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist 0.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
  • Führen Sie die Vorlage aus.

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to BigQuery templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
    8. Klicken Sie auf Job ausführen.

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Ersetzen Sie dabei Folgendes:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.ResourceUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.base.Strings;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.Row;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery",
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName",
          },
          flexContainerName = "pubsub-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true),
      @Template(
          name = "PubSub_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub to BigQuery with Python UDFs",
          type = Template.TemplateType.XLANG,
          description =
              "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "
                  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
          optionsClass = Options.class,
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          flexContainerName = "pubsub-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
            "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
          },
          streaming = true,
          supportsAtLeastOnce = true,
          supportsExactlyOnce = true)
    })
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options
          extends PipelineOptions,
              BigQueryStorageApiStreamingOptions,
              PythonExternalTextTransformerOptions,
              DataflowPipelineWorkerPoolOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            groupName = "Target",
            description = "BigQuery output table",
            helpText =
                "The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`.")
        String getOutputTableSpec();
    
        void setOutputTableSpec(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            groupName = "Source",
            optional = true,
            description = "Input Pub/Sub topic",
            helpText =
                "The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
        String getInputTopic();
    
        void setInputTopic(String value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            groupName = "Source",
            optional = true,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub subscription to read from, "
                    + "formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`.")
        String getInputSubscription();
    
        void setInputSubscription(String value);
    
        @TemplateParameter.BigQueryTable(
            order = 4,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that failed to reach the output table, "
                    + "formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "
                    + "doesn't exist, it is created when the pipeline runs. "
                    + "If this parameter is not specified, "
                    + "the value `OUTPUT_TABLE_SPEC_error_records` is used instead.")
        String getOutputDeadletterTable();
    
        void setOutputDeadletterTable(String value);
    
        @TemplateParameter.Boolean(
            order = 5,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. "
                    + "To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"
                    + ", set this parameter to true. "
                    + "To use exactly-once semantics, set the parameter to `false`. "
                    + "This parameter applies only when `useStorageWriteApi` is `true`. "
                    + "The default value is `false`.")
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        //    options.setWorkerDiskType(
        //
        // "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
        boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
        if (useInputSubscription == useInputTopic) {
          throw new IllegalArgumentException(
              "Either input topic or input subscription must be provided, but not both.");
        }
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (useInputSubscription) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                            ? options.getOutputDeadletterTable()
                            : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    !Strings.isNullOrEmpty(options.getOutputDeadletterTable())
                        ? options.getOutputDeadletterTable()
                        : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
          boolean useJavascriptUdf =
              !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
          boolean usePythonUdf =
              !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
          if (useJavascriptUdf && usePythonUdf) {
            throw new IllegalArgumentException(
                "Either javascript or Python gcs path must be provided, but not both.");
          }
          PCollectionTuple udfOut;
          if (usePythonUdf) {
            PCollection<Row> udfRowsOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply(
                        "MapToRecord",
                        PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                            .pubSubMappingFunction())
                    .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
                    .apply(
                        "InvokeUDF",
                        PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                            .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                            .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                            .build());
            udfOut =
                udfRowsOut.apply(
                    "MapRowsToFailsafeElements",
                    ParDo.of(new RowToPubSubFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
                        .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
          } else {
            udfOut =
                input
                    // Map the incoming messages into FailsafeElements so we can recover from failures
                    // across multiple transforms.
                    .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                    .apply(
                        "InvokeUDF",
                        FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                            .setFunctionName(options.getJavascriptTextTransformFunctionName())
                            .setReloadIntervalMinutes(
                                options.getJavascriptTextTransformReloadIntervalMinutes())
                            .setSuccessTag(UDF_OUT)
                            .setFailureTag(UDF_DEADLETTER_OUT)
                            .build());
          }
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    Nächste Schritte