Pub/Sub Proto to BigQuery with Python UDF 템플릿

Pub/Sub proto to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 proto 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

Python 사용자 정의 함수(UDF)를 제공하여 데이터를 변환할 수 있습니다. UDF 실행 중 오류는 개별 Pub/Sub 주제 또는 BigQuery 오류와 동일한 미처리 주제로 전송될 수 있습니다.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Proto 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 출력 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.
  • BigQuery 테이블이 있으면 createDisposition 값에 관계없이 Proto 데이터와 일치하는 스키마가 있어야 합니다.

템플릿 매개변수

매개변수 설명
protoSchemaPath 자체 포함된 Proto 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/file.pb입니다. 이 파일은 protoc 명령어의 --descriptor_set_out 플래그를 사용하여 생성할 수 있습니다. --include_imports 플래그는 파일을 독립 실행형 파일로 만듭니다.
fullMessageName 전체 Proto 메시지 이름입니다. 예를 들면 package.name.MessageName입니다. 여기서 package.namejava_package 문이 아닌 package 문에 대해 제공된 값입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. 예를 들면 projects/<project>/subscriptions/<subscription>입니다.
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 my-project:my_dataset.my_table입니다. 지정된 createDisposition에 따라 입력 스키마 파일을 사용해서 출력 테이블을 자동으로 만들 수 있습니다.
preserveProtoFieldNames (선택사항) JSON에서 원본 Proto 필드를 보존하기 위한 true입니다. 더 많은 표준 JSON 이름을 사용하기 위한 false입니다. 예를 들어 falsefield_namefieldName으로 바꿉니다. (기본값: false)
bigQueryTableSchemaPath (선택사항) BigQuery 스키마 경로에 대한 Cloud Storage 경로입니다. 예를 들면 gs://path/to/my/schema.json입니다. 제공되지 않는 경우 스키마가 Proto 스키마에서 유추됩니다.
pythonExternalTextTransformGcsPath (선택사항) 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
pythonExternalTextTransformFunctionName (선택사항) 사용할 Python 사용자 정의 함수(UDF)의 이름입니다.
udfOutputTopic (선택사항) UDF 오류를 저장하는 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다. 제공되지 않는 경우 UDF 오류가 outputTopic과 동일한 주제로 전송됩니다.
writeDisposition (선택사항) BigQuery WriteDisposition입니다. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition입니다. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED
useStorageWriteApi (선택사항): true이면 파이프라인에서 BigQuery Storage Write API를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API 사용을 참조하세요.
useStorageWriteApiAtLeastOnce (선택사항): Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스를 사용하려면 이 매개변수를 true으로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
numStorageWriteApiStreams (선택사항): Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
storageWriteApiTriggeringFrequencySec (선택사항): Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔gcloudAPI
    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Proto to BigQuery with Python UDF template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 작업 실행을 클릭합니다.

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    다음을 바꿉니다.

    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제
    Java
    /*
     * Copyright (C) 2021 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.templates;
    
    import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    import com.google.cloud.teleport.metadata.MultiTemplate;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
    import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
    import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
    import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
    import com.google.cloud.teleport.v2.transforms.ErrorConverters;
    import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
    import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
    import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.cloud.teleport.v2.utils.GCSUtils;
    import com.google.cloud.teleport.v2.utils.SchemaUtils;
    import com.google.cloud.teleport.v2.values.FailsafeElement;
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Strings;
    import com.google.protobuf.Descriptors.Descriptor;
    import com.google.protobuf.DynamicMessage;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.util.JsonFormat;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.PipelineResult;
    import org.apache.beam.sdk.coders.NullableCoder;
    import org.apache.beam.sdk.coders.RowCoder;
    import org.apache.beam.sdk.coders.StringUtf8Coder;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
    import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionTuple;
    import org.apache.beam.sdk.values.TupleTag;
    import org.apache.beam.sdk.values.TupleTagList;
    import org.apache.beam.sdk.values.TypeDescriptor;
    import org.apache.beam.sdk.values.TypeDescriptors;
    import org.apache.commons.lang3.ArrayUtils;
    
    /**
     * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
     * records from Pub/Sub to BigQuery.
     *
     * <p>Persistent failures are written to a Pub/Sub unprocessed topic.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @MultiTemplate({
      @Template(
          name = "PubSub_Proto_to_BigQuery_Flex",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery",
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A JavaScript user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "pythonExternalTextTransformGcsPath",
            "pythonExternalTextTransformFunctionName"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true),
      @Template(
          name = "PubSub_Proto_to_BigQuery_Xlang",
          category = TemplateCategory.STREAMING,
          displayName = "Pub/Sub Proto to BigQuery with Python UDF",
          type = Template.TemplateType.XLANG,
          description = {
            "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
                + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
            "A Python user-defined function (UDF) can be provided to transform data. "
                + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
          },
          skipOptions = {
            "javascriptTextTransformGcsPath",
            "javascriptTextTransformFunctionName",
            "javascriptTextTransformReloadIntervalMinutes"
          },
          optionsClass = PubSubProtoToBigQueryOptions.class,
          flexContainerName = "pubsub-proto-to-bigquery-xlang",
          documentation =
              "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
          contactInformation = "https://cloud.google.com/support",
          requirements = {
            "The input Pub/Sub subscription must exist.",
            "The schema file for the Proto records must exist on Cloud Storage.",
            "The output Pub/Sub topic must exist.",
            "The output BigQuery dataset must exist.",
            "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
          },
          streaming = true,
          supportsAtLeastOnce = true)
    })
    public final class PubsubProtoToBigQuery {
      private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
      private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
    
      private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
          FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    
      public static void main(String[] args) {
        UncaughtExceptionLogger.register();
        run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
      }
    
      /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
      public interface PubSubProtoToBigQueryOptions
          extends ReadSubscriptionOptions,
              WriteOptions,
              WriteTopicOptions,
              PythonExternalTextTransformer.PythonExternalTextTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        @TemplateParameter.GcsReadFile(
            order = 1,
            description = "Cloud Storage Path to the Proto Schema File",
            helpText =
                "The Cloud Storage location of the self-contained proto schema file. For example,"
                    + " `gs://path/to/my/file.pb`. You can generate this file with"
                    + " the `--descriptor_set_out` flag of the protoc command."
                    + " The `--include_imports` flag guarantees that the file is self-contained.")
        @Required
        String getProtoSchemaPath();
    
        void setProtoSchemaPath(String value);
    
        @TemplateParameter.Text(
            order = 2,
            regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
            description = "Full Proto Message Name",
            helpText =
                "The full proto message name. For example, `package.name`."
                    + " `MessageName`, where `package.name` is the value provided for the"
                    + " `package` statement and not the `java_package` statement.")
        @Required
        String getFullMessageName();
    
        void setFullMessageName(String value);
    
        @TemplateParameter.Boolean(
            order = 3,
            optional = true,
            description = "Preserve Proto Field Names",
            helpText =
                "To preserve the original proto field name in JSON, set this property to `true`. "
                    + "To use more standard JSON names, set to `false`."
                    + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.")
        @Default.Boolean(false)
        Boolean getPreserveProtoFieldNames();
    
        void setPreserveProtoFieldNames(Boolean value);
    
        @TemplateParameter.GcsReadFile(
            order = 4,
            optional = true,
            description = "BigQuery Table Schema Path",
            helpText =
                "The Cloud Storage path to the BigQuery schema path. "
                    + "If this value isn't provided, then the schema is inferred from the Proto schema.",
            example = "gs://MyBucket/bq_schema.json")
        String getBigQueryTableSchemaPath();
    
        void setBigQueryTableSchemaPath(String value);
    
        @TemplateParameter.PubsubTopic(
            order = 5,
            optional = true,
            description = "Pub/Sub output topic for UDF failures",
            helpText =
                "The Pub/Sub topic storing the UDF errors."
                    + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",
            example = "projects/your-project-id/topics/your-topic-name")
        String getUdfOutputTopic();
    
        void setUdfOutputTopic(String udfOutputTopic);
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 6,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics."
                    + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."
                    + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** Runs the pipeline and returns the results. */
      private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    
        Pipeline pipeline = Pipeline.create(options);
    
        Descriptor descriptor = getDescriptor(options);
        PCollection<String> maybeForUdf =
            pipeline
                .apply("Read From Pubsub", readPubsubMessages(options, descriptor))
                .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
    
        WriteResult writeResult =
            runUdf(maybeForUdf, options)
                .apply("Write to BigQuery", writeToBigQuery(options, descriptor));
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Create Error Payload",
                ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
                    .setPayloadCoder(StringUtf8Coder.of())
                    .setTranslateFunction(BigQueryConverters::tableRowToJson)
                    .build())
            .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
    
        return pipeline.run();
      }
    
      /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
      @VisibleForTesting
      static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
        String schemaPath = options.getProtoSchemaPath();
        String messageName = options.getFullMessageName();
        Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
    
        if (descriptor == null) {
          throw new IllegalArgumentException(
              messageName + " is not a recognized message in " + schemaPath);
        }
    
        return descriptor;
      }
    
      /** Returns the {@link PTransform} for reading Pub/Sub messages. */
      private static Read<DynamicMessage> readPubsubMessages(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        return PubsubIO.readProtoDynamicMessages(descriptor)
            .fromSubscription(options.getInputSubscription())
            .withDeadLetterTopic(options.getOutputTopic());
      }
    
      /**
       * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
       *
       * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
       * specified in {@code options}.
       */
      @VisibleForTesting
      static Write<String> writeToBigQuery(
          PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
        Write<String> write =
            BigQueryConverters.<String>createWriteTransform(options)
                .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    
        String schemaPath = options.getBigQueryTableSchemaPath();
        if (Strings.isNullOrEmpty(schemaPath)) {
          return write.withSchema(
              SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
        } else {
          return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
        }
      }
    
      /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
      private static class ConvertDynamicProtoMessageToJson
          extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
        private final boolean preserveProtoName;
    
        private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
          this.preserveProtoName = options.getPreserveProtoFieldNames();
        }
    
        @Override
        public PCollection<String> expand(PCollection<DynamicMessage> input) {
          return input.apply(
              "Map to JSON",
              MapElements.into(TypeDescriptors.strings())
                  .via(
                      message -> {
                        try {
                          JsonFormat.Printer printer = JsonFormat.printer();
                          return preserveProtoName
                              ? printer.preservingProtoFieldNames().print(message)
                              : printer.print(message);
                        } catch (InvalidProtocolBufferException e) {
                          throw new RuntimeException(e);
                        }
                      }));
        }
      }
    
      /**
       * Handles running the UDF.
       *
       * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
       *
       * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
       * topic.
       *
       * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
       * @param options the options containing info on running the UDF
       * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
       *     called
       */
      @VisibleForTesting
      static PCollection<String> runUdf(
          PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
    
        boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
        boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    
        // In order to avoid generating a graph that makes it look like a UDF was called when none was
        // intended, simply return the input as "success" output.
        if (!useJavascriptUdf && !usePythonUdf) {
          return jsonCollection;
        }
    
        // For testing purposes, we need to do this check before creating the PTransform rather than
        // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
        // a value.
        if (useJavascriptUdf
            && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "JavaScript function name cannot be null or empty if file is set");
        }
        if (usePythonUdf
            && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {
          throw new IllegalArgumentException(
              "Python function name cannot be null or empty if file is set");
        }
        if (usePythonUdf && useJavascriptUdf) {
          throw new IllegalArgumentException(
              "Either javascript or Python gcs path must be provided, but not both.");
        }
    
        PCollectionTuple maybeSuccess;
        if (usePythonUdf) {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));
        } else {
          maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
        }
    
        maybeSuccess
            .get(UDF_FAILURE_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Failures",
                ConvertFailsafeElementToPubsubMessage.<String, String>builder()
                    .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
                    .setErrorMessageAttributeKey("udfErrorMessage")
                    .build())
            .apply("Write Failed UDF", writeUdfFailures(options));
    
        return maybeSuccess
            .get(UDF_SUCCESS_TAG)
            .setCoder(FAILSAFE_CODER)
            .apply(
                "Get UDF Output",
                MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
            .setCoder(NullableCoder.of(StringUtf8Coder.of()));
      }
    
      /** {@link PTransform} that calls a UDF and returns both success and failure output. */
      private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe UDF", makeFailsafe())
              .setCoder(FAILSAFE_CODER)
              .apply(
                  "Call UDF",
                  FailsafeJavascriptUdf.<String>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_SUCCESS_TAG)
                      .setFailureTag(UDF_FAILURE_TAG)
                      .build());
        }
    
        private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
          return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
              .via((String json) -> FailsafeElement.of(json, json));
        }
      }
    
      /** {@link PTransform} that calls a python UDF and returns both success and failure output. */
      private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {
        private final PubSubProtoToBigQueryOptions options;
    
        RunPythonUdf(PubSubProtoToBigQueryOptions options) {
          this.options = options;
        }
    
        @Override
        public PCollectionTuple expand(PCollection<String> input) {
          return input
              .apply("Prepare Failsafe row", stringMappingFunction())
              .setCoder(
                  RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA))
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  "MapRowsToFailsafeElements",
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))
                      .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));
        }
      }
    
      /**
       * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
       * topic.
       */
      private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
          PubSubProtoToBigQueryOptions options) {
        PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
        return Strings.isNullOrEmpty(options.getUdfOutputTopic())
            ? write.to(options.getOutputTopic())
            : write.to(options.getUdfOutputTopic());
      }
    }
    

    다음 단계