Pub/Sub Proto から BigQuery への Python UDF を含むテンプレート

Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

データを変換するための Python ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。

パイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 出力 Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。
  • BigQuery テーブルが存在する場合は、createDisposition 値にかかわらず、proto データに一致するスキーマが必要です。

テンプレートのパラメータ

パラメータ 説明
protoSchemaPath 自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pbこのファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。
fullMessageName proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_tablecreateDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。
preserveProtoFieldNames 省略可: JSON で元の Proto フィールド名を保持する場合は true を指定します。false にすると、標準の JSON 名を使用されます。たとえば、false と指定すると field_namefieldName に変更されます。(デフォルト: false
bigQueryTableSchemaPath 省略可: BigQuery スキーマパスへの Cloud Storage パス。たとえば、gs://path/to/my/schema.json のようにします。指定されていない場合、スキーマは Proto スキーマから推測されます。
pythonExternalTextTransformGcsPath 省略可: 使用するユーザー定義関数(UDF)を定義する Python コードファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 省略可: 使用する Python ユーザー定義関数(UDF)の名前。
udfOutputTopic 省略可: UDF エラーを保存する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。
writeDisposition 省略可: BigQuery WriteDispositionWRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE など。デフォルト: WRITE_APPEND
createDisposition 省略可: BigQuery CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVERデフォルト: CREATE_IF_NEEDED
useStorageWriteApi 省略可: true の場合、パイプラインは BigQuery Storage Write API を使用します。デフォルト値は false です。詳細については、Storage Write API の使用をご覧ください。
useStorageWriteApiAtLeastOnce 省略可: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクスを使用するには、このパラメータを true に設定します。1 回限りのセマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
numStorageWriteApiStreams 省略可: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
storageWriteApiTriggeringFrequencySec 省略可: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された Pub/Sub メッセージ データ フィールド。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソールgcloudAPI
    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Proto to BigQuery with Python UDF template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    次のように置き換えます。

    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック
    Java
    /*
     * Copyright (C) 2021 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
    import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.GCSUtils;
    import com.google.cloud.teleport.v2.utils.SchemaUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Strings;
    import com.google.protobuf.Descriptors.Descriptor;
    import com.google.protobuf.DynamicMessage;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.util.JsonFormat;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.NullableCoder;
    import org.apache.beam.sdk.coders.RowCoder;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.apache.beam.sdk.values.TypeDescriptor;
    import org.apache.beam.sdk.values.TypeDescriptors;
    import org.apache.commons.lang3.ArrayUtils;
    
    /**
     * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
     * records from Pub/Sub to BigQuery.
     *
     * <p>Persistent failures are written to a Pub/Sub unprocessed topic.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_Proto_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery",
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A JavaScript user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true),
      @Template(
          name = "PubSub_Proto_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery with Python UDF",
          type = Template.TemplateType.XLANG,
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A Python user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true)
    })
    public final class PubsubProtoToBigQuery {
      private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
      private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
    
      private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
        run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
      }
    
      /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
      public interface PubSubProtoToBigQueryOptions
          extends ReadSubscriptionOptions,
              WriteOptions,
              WriteTopicOptions,
              PythonExternalTextTransformer.PythonExternalTextTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        @TemplateParameter.GcsReadFile(
            order = 1,
            description = "Cloud Storage Path to the Proto Schema File",
            helpText =
                "The Cloud Storage location of the self-contained proto schema file. For example,"
                    + " gs://path/to/my/file.pb. You can generate this file with"
                    + " the `--descriptor_set_out` flag of the protoc command."
                    + " The `--include_imports` flag guarantees that the file is self-contained.")
        @Required
        String getProtoSchemaPath();
    
        void setProtoSchemaPath(String value);
    
        @TemplateParameter.Text(
            order = 2,
            regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
            description = "Full Proto Message Name",
            helpText =
                "The full proto message name. For example, `package.name`."
                    + " `MessageName`, where `package.name` is the value provided for the"
                    + " `package` statement and not the `java_package` statement.")
        @Required
        String getFullMessageName();
    
        void setFullMessageName(String value);
    
        @TemplateParameter.Boolean(
            order = 3,
            optional = true,
            description = "Preserve Proto Field Names",
            helpText =
                "To preserve the original proto field name in JSON, set this property to true. "
                    + "To use more standard JSON names, set to false."
                    + " For example, `false` would change `field_name` to `fieldName`. Defaults to: false.")
        @Default.Boolean(false)
        Boolean getPreserveProtoFieldNames();
    
        void setPreserveProtoFieldNames(Boolean value);
    
        @TemplateParameter.GcsReadFile(
            order = 4,
            optional = true,
            description = "BigQuery Table Schema Path",
            helpText =
                "The Cloud Storage path to the BigQuery schema path. "
                    + "If this value isn't provided, then the schema is inferred from the Proto schema.",
            example = "gs://MyBucket/bq_schema.json")
        String getBigQueryTableSchemaPath();
    
        void setBigQueryTableSchemaPath(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 5,
            optional = true,
            description = "Pub/Sub output topic for UDF failures",
            helpText =
                "The Pub/Sub topic storing the UDF errors."
                    + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",
            example = "projects/your-project-id/topics/your-topic-name")
        String getUdfOutputTopic();
    
        void setUdfOutputTopic(String udfOutputTopic);
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 6,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics."
                    + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."
                    + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** Runs the pipeline and returns the results. */
      private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    
        Pipeline pipeline = Pipeline.create(options);
    
        Descriptor descriptor = getDescriptor(options);
        PCollection<String> maybeForUdf =
            pipeline
                .apply("Read From Pubsub", readPubsubMessages(options, descriptor))
                .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
    
        WriteResult writeResult =
            runUdf(maybeForUdf, options)
                .apply("Write to BigQuery", writeToBigQuery(options, descriptor));
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Create Error Payload",
                ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
                    .setPayloadCoder(StringUtf8Coder.of())
                    .setTranslateFunction(BigQueryConverters::tableRowToJson)
                    .build())
            .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
    
        return pipeline.run();
      }
    
      /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
      @VisibleForTesting
      static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
        String schemaPath = options.getProtoSchemaPath();
        String messageName = options.getFullMessageName();
        Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
    
        if (descriptor == null) {
          throw new IllegalArgumentException(
              messageName + " is not a recognized message in " + schemaPath);
        }
    
        return descriptor;
      }
    
      /** Returns the {@link PTransform} for reading Pub/Sub messages. */
      private static Read<DynamicMessage> readPubsubMessages(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        return PubsubIO.readProtoDynamicMessages(descriptor)
            .fromSubscription(options.getInputSubscription())
            .withDeadLetterTopic(options.getOutputTopic());
      }
    
      /**
       * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
       *
       * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
       * specified in {@code options}.
       */
      @VisibleForTesting
      static Write<String> writeToBigQuery(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        Write<String> write =
            BigQueryConverters.<String>createWriteTransform(options)
                .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    
        String schemaPath = options.getBigQueryTableSchemaPath();
        if (Strings.isNullOrEmpty(schemaPath)) {
          return write.withSchema(
              SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
        } else {
          return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
        }
      }
    
      /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
      private static class ConvertDynamicProtoMessageToJson
          extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
        private final boolean preserveProtoName;
    
        private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
          this.preserveProtoName = options.getPreserveProtoFieldNames();
        }
    
        @Override
        public PCollection<String> expand(PCollection<DynamicMessage> input) {
          return input.apply(
              "Map to JSON",
              MapElements.into(TypeDescriptors.strings())
                  .via(
                      message -> {
                        try {
                          JsonFormat.Printer printer = JsonFormat.printer();
                          return preserveProtoName
                              ? printer.preservingProtoFieldNames().print(message)
                              : printer.print(message);
                        } catch (InvalidProtocolBufferException e) {
                          throw new RuntimeException(e);
                        }
                      }));
        }
      }
    
      /**
       * Handles running the UDF.
       *
       * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
       *
       * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
       * topic.
       *
       * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
       * @param options the options containing info on running the UDF
       * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
       *     called
       */
      @VisibleForTesting
      static PCollection<String> runUdf(
          PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
    
        boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
        boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    
        // In order to avoid generating a graph that makes it look like a UDF was called when none was
        // intended, simply return the input as "success" output.
        if (!useJavascriptUdf && !usePythonUdf) {
          return jsonCollection;
        }
    
        // For testing purposes, we need to do this check before creating the PTransform rather than
        // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
        // a value.
        if (useJavascriptUdf
            && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "JavaScript function name cannot be null or empty if file is set");
        }
        if (usePythonUdf
            && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "Python function name cannot be null or empty if file is set");
        }
        if (usePythonUdf && useJavascriptUdf) {
          throw new IllegalArgumentException(
              "Either javascript or Python gcs path must be provided, but not both.");
        }
    
        PCollectionTuple maybeSuccess;
        if (usePythonUdf) {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));
        } else {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
        }
    
        maybeSuccess
            .get(UDF_FAILURE_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Failures",
                ConvertFailsafeElementToPubsubMessage.<String, String>builder()
                    .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
                    .setErrorMessageAttributeKey("udfErrorMessage")
                    .build())
            .apply("Write Failed UDF", writeUdfFailures(options));
    
        return maybeSuccess
            .get(UDF_SUCCESS_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Output",
                MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
            .setCoder(NullableCoder.of(StringUtf8Coder.of()));
      }
    
      /** {@link PTransform} that calls a UDF and returns both success and failure output. */
      private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe UDF", makeFailsafe())
              .setCoder(FAILSAFE_CODER)
              .apply(
                  "Call UDF",
                  FailsafeJavascriptUdf.<String>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_SUCCESS_TAG)
                      .setFailureTag(UDF_FAILURE_TAG)
                      .build());
        }
    
        private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
          return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
              .via((String json) -> FailsafeElement.of(json, json));
        }
      }
    
      /** {@link PTransform} that calls a python UDF and returns both success and failure output. */
      private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunPythonUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe row", stringMappingFunction())
              .setCoder(
                  RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA))
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  "MapRowsToFailsafeElements",
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))
                      .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));
        }
      }
    
      /**
       * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
       * topic.
       */
      private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
          PubSubProtoToBigQueryOptions options) {
        PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
        return Strings.isNullOrEmpty(options.getUdfOutputTopic())
            ? write.to(options.getOutputTopic())
            : write.to(options.getUdfOutputTopic());
      }
    }
    

    次のステップ