Pub/Sub Subscription to BigQuery 模板

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

流水线要求

  • Pub/Sub 消息的 data 字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将 data 字段中值为 {"k1":"v1", "k2":"v2"} 格式的消息插入具有名为 k1k2 的列且数据类型为字符串的 BigQuery 表中。
  • 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。

模板参数

必需参数

  • outputTableSpec:BigQuery 输出表位置,格式为 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • inputSubscription:要读取的 Pub/Sub 输入订阅,格式为 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>

可选参数

  • outputDeadletterTable:未能到达输出表的消息的 BigQuery 表,格式为 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>。如果该表不存在,系统会在流水线执行期间创建该表。如果未指定,则系统会使用 OUTPUT_TABLE_SPEC_error_records
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:定义工作器检查 JavaScript UDF 更改以重新加载文件的时间间隔。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Subscription to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
    8. 点击运行作业

    在 shell 或终端中,运行模板:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    替换以下内容:

    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
    • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
    • DATASET:您的 BigQuery 数据集
    • TABLE_NAME:您的 BigQuery 表名称

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
    • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
    • DATASET:您的 BigQuery 数据集
    • TABLE_NAME:您的 BigQuery 表名称
    Java
    /*
     * Copyright (C) 2018 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.templates;
    
    import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.cloud.teleport.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateCreationParameter;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
    import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
    import com.google.cloud.teleport.templates.common.ErrorConverters;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider;
    import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
    import com.google.cloud.teleport.util.ResourceUtils;
    import com.google.cloud.teleport.util.ValueProviderUtils;
    import com.google.cloud.teleport.values.FailsafeElement;
    import com.google.common.collect.ImmutableList;
    import java.nio.charset.StandardCharsets;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.ValueProvider;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SerializableFunction;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
     * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
     * which occur in the transformation of the data or execution of the UDF will be output to a
     * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
     * execution. Both output and error tables are specified by the user as template parameters.
     *
     * <p><b>Pipeline Requirements</b>
     *
     * <ul>
     *   <li>The Pub/Sub topic exists.
     *   <li>The BigQuery output table exists.
     * </ul>
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_Subscription_to_BigQuery.md">README
     * for Subscription</a> or <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_to_BigQuery.md">README
     * for Topic</a> for instructions on how to use or modify this template.
     */
    @Template(
        name = "PubSub_Subscription_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Subscription to BigQuery",
        description =
            "The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputTopic",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        streaming = true,
        supportsAtLeastOnce = true,
        supportsExactlyOnce = true)
    @Template(
        name = "PubSub_to_BigQuery",
        category = TemplateCategory.STREAMING,
        displayName = "Pub/Sub Topic to BigQuery",
        description =
            "The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. "
                + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
                + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
        optionsClass = Options.class,
        skipOptions = "inputSubscription",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        requirements = {
          "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
          "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
        },
        hidden = true,
        streaming = true,
        supportsAtLeastOnce = true)
    public class PubSubToBigQuery {
    
      /** The log to output status messages to. */
      private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
    
      /** The tag for the main output for the UDF. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the main output of the json transformation. */
      public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
    
      /** The tag for the dead-letter output of the udf. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The tag for the dead-letter output of the json to table row transform. */
      public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
          new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
    
      /** The default suffix for error tables if dead letter table is not specified. */
      public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    
      /** Pubsub message/string coder for pipeline. */
      public static final FailsafeElementCoder<PubsubMessage, String> CODER =
          FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
      /** String/String Coder for FailsafeElement. */
      public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      /**
       * The {@link Options} class provides the custom execution options passed by the executor at the
       * command-line.
       */
      public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
        @TemplateParameter.BigQueryTable(
            order = 1,
            description = "BigQuery output table",
            helpText =
                "The BigQuery output table location, in the format `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`")
        ValueProvider<String> getOutputTableSpec();
    
        void setOutputTableSpec(ValueProvider<String> value);
    
        @TemplateParameter.PubsubTopic(
            order = 2,
            description = "Input Pub/Sub topic",
            helpText = "The Pub/Sub topic to read the input from.")
        ValueProvider<String> getInputTopic();
    
        void setInputTopic(ValueProvider<String> value);
    
        @TemplateParameter.PubsubSubscription(
            order = 3,
            description = "Pub/Sub input subscription",
            helpText =
                "The Pub/Sub input subscription to read from, in the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>`.")
        ValueProvider<String> getInputSubscription();
    
        void setInputSubscription(ValueProvider<String> value);
    
        @TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
        @TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
        @Description(
            "This determines whether the template reads from a Pub/sub subscription or a topic")
        @Default.Boolean(false)
        Boolean getUseSubscription();
    
        void setUseSubscription(Boolean value);
    
        @TemplateParameter.BigQueryTable(
            order = 5,
            optional = true,
            description =
                "Table for messages failed to reach the output table (i.e., Deadletter table)",
            helpText =
                "The BigQuery table to use for messages that fail to reach the output table, in the format of `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`. If the table doesn't exist, it is created during pipeline execution. If not specified, `OUTPUT_TABLE_SPEC_error_records` is used.")
        ValueProvider<String> getOutputDeadletterTable();
    
        void setOutputDeadletterTable(ValueProvider<String> value);
      }
    
      /**
       * The main entry-point for pipeline execution. This method will start the pipeline but will not
       * wait for it's execution to finish. If blocking execution is required, use the {@link
       * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
       * result.waitUntilFinish()} on the {@link PipelineResult}.
       *
       * @param args The command-line args passed by the executor.
       */
      public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        run(options);
      }
    
      /**
       * Runs the pipeline to completion with the specified options. This method does not wait until the
       * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
       * object to block until the pipeline is finished running if blocking programmatic execution is
       * required.
       *
       * @param options The execution options.
       * @return The pipeline result.
       */
      public static PipelineResult run(Options options) {
    
        Pipeline pipeline = Pipeline.create(options);
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    
        /*
         * Steps:
         *  1) Read messages in from Pub/Sub
         *  2) Transform the PubsubMessages into TableRows
         *     - Transform message payload via UDF
         *     - Convert UDF result to TableRow objects
         *  3) Write successful records out to BigQuery
         *  4) Write failed records out to BigQuery
         */
    
        /*
         * Step #1: Read messages in from Pub/Sub
         * Either from a Subscription or Topic
         */
    
        PCollection<PubsubMessage> messages = null;
        if (options.getUseSubscription()) {
          messages =
              pipeline.apply(
                  "ReadPubSubSubscription",
                  PubsubIO.readMessagesWithAttributes()
                      .fromSubscription(options.getInputSubscription()));
        } else {
          messages =
              pipeline.apply(
                  "ReadPubSubTopic",
                  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
        }
    
        PCollectionTuple convertedTableRows =
            messages
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        /*
         * Step #3: Write the successful records out to BigQuery
         */
        WriteResult writeResult =
            convertedTableRows
                .get(TRANSFORM_OUT)
                .apply(
                    "WriteSuccessfulRecords",
                    BigQueryIO.writeTableRows()
                        .withoutValidation()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                        .withExtendedErrorInfo()
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .to(options.getOutputTableSpec()));
    
        /*
         * Step 3 Contd.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            writeResult
                .getFailedInsertsWithErr()
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);
    
        /*
         * Step #4: Write records that failed table row transformation
         * or conversion out to BigQuery deadletter table.
         */
        PCollectionList.of(
                ImmutableList.of(
                    convertedTableRows.get(UDF_DEADLETTER_OUT),
                    convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                ErrorConverters.WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        ValueProviderUtils.maybeUseDefaultDeadletterTable(
                            options.getOutputDeadletterTable(),
                            options.getOutputTableSpec(),
                            DEFAULT_DEADLETTER_TABLE_SUFFIX))
                    .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                    .build());
    
        // 5) Insert records that failed insert into deadletter table
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    ValueProviderUtils.maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    
        return pipeline.run();
      }
    
      /**
       * If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
       * defaultDeadLetterTableSuffix is returned instead.
       */
      private static ValueProvider<String> maybeUseDefaultDeadletterTable(
          ValueProvider<String> deadletterTable,
          ValueProvider<String> outputTableSpec,
          String defaultDeadLetterTableSuffix) {
        return DualInputNestedValueProvider.of(
            deadletterTable,
            outputTableSpec,
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                String userProvidedTable = input.getX();
                String outputTableSpec = input.getY();
                if (userProvidedTable == null) {
                  return outputTableSpec + defaultDeadLetterTableSuffix;
                }
                return userProvidedTable;
              }
            });
      }
    
      /**
       * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
       * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
       * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
       * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
       * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
       * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
       * PCollection}.
       *
       * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
       *
       * <ul>
       *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
       *       successfully processed by the optional UDF.
       *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which failed processing during the UDF execution.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
       *       JSON to {@link TableRow} objects.
       *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
       *       records which couldn't be converted to table rows.
       * </ul>
       */
      static class PubsubMessageToTableRow
          extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    
        private final Options options;
    
        PubsubMessageToTableRow(Options options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<PubsubMessage> input) {
    
          PCollectionTuple udfOut =
              input
                  // Map the incoming messages into FailsafeElements so we can recover from failures
                  // across multiple transforms.
                  .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
                  .apply(
                      "InvokeUDF",
                      FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                          .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                          .setFunctionName(options.getJavascriptTextTransformFunctionName())
                          .setReloadIntervalMinutes(
                              options.getJavascriptTextTransformReloadIntervalMinutes())
                          .setSuccessTag(UDF_OUT)
                          .setFailureTag(UDF_DEADLETTER_OUT)
                          .build());
    
          // Convert the records which were successfully processed by the UDF into TableRow objects.
          PCollectionTuple jsonToTableRowOut =
              udfOut
                  .get(UDF_OUT)
                  .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                          .setSuccessTag(TRANSFORM_OUT)
                          .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                          .build());
    
          // Re-wrap the PCollections so we can return a single PCollectionTuple
          return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
              .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
              .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
        }
      }
    
      /**
       * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
       * {@link FailsafeElement} class so errors can be recovered from and the original message can be
       * output to a error records table.
       */
      static class PubsubMessageToFailsafeElementFn
          extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          PubsubMessage message = context.element();
          context.output(
              FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
        }
      }
    }
    

    后续步骤