/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.ArrayUtils;
/**
 * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>
 * records from Pub/Sub to BigQuery.
 *
 * <p>Persistent failures are written to a Pub/Sub unprocessed topic.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "PubSub_Proto_to_BigQuery_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub Proto to BigQuery",
      description = {
        "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
            + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
        "A JavaScript user-defined function (UDF) can be provided to transform data. "
            + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
      },
      skipOptions = {
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      optionsClass = PubSubProtoToBigQueryOptions.class,
      flexContainerName = "pubsub-proto-to-bigquery",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "The input Pub/Sub subscription must exist.",
        "The schema file for the Proto records must exist on Cloud Storage.",
        "The output Pub/Sub topic must exist.",
        "The output BigQuery dataset must exist.",
        "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "PubSub_Proto_to_BigQuery_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub Proto to BigQuery with Python UDF",
      type = Template.TemplateType.XLANG,
      description = {
        "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "
            + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",
        "A Python user-defined function (UDF) can be provided to transform data. "
            + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."
      },
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      optionsClass = PubSubProtoToBigQueryOptions.class,
      flexContainerName = "pubsub-proto-to-bigquery-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "The input Pub/Sub subscription must exist.",
        "The schema file for the Proto records must exist on Cloud Storage.",
        "The output Pub/Sub topic must exist.",
        "The output BigQuery dataset must exist.",
        "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public final class PubsubProtoToBigQuery {
  private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();
  private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();
  private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));
  }
  /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */
  public interface PubSubProtoToBigQueryOptions
      extends ReadSubscriptionOptions,
          WriteOptions,
          WriteTopicOptions,
          PythonExternalTextTransformer.PythonExternalTextTransformerOptions,
          BigQueryStorageApiStreamingOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        description = "Cloud Storage Path to the Proto Schema File",
        helpText =
            "The Cloud Storage location of the self-contained proto schema file. For example,"
                + " `gs://path/to/my/file.pb`. You can generate this file with"
                + " the `--descriptor_set_out` flag of the protoc command."
                + " The `--include_imports` flag guarantees that the file is self-contained.")
    @Required
    String getProtoSchemaPath();
    void setProtoSchemaPath(String value);
    @TemplateParameter.Text(
        order = 2,
        regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
        description = "Full Proto Message Name",
        helpText =
            "The full proto message name. For example, `package.name`."
                + " `MessageName`, where `package.name` is the value provided for the"
                + " `package` statement and not the `java_package` statement.")
    @Required
    String getFullMessageName();
    void setFullMessageName(String value);
    @TemplateParameter.Boolean(
        order = 3,
        optional = true,
        description = "Preserve Proto Field Names",
        helpText =
            "To preserve the original proto field name in JSON, set this property to `true`. "
                + "To use more standard JSON names, set to `false`."
                + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.")
    @Default.Boolean(false)
    Boolean getPreserveProtoFieldNames();
    void setPreserveProtoFieldNames(Boolean value);
    @TemplateParameter.GcsReadFile(
        order = 4,
        optional = true,
        description = "BigQuery Table Schema Path",
        helpText =
            "The Cloud Storage path to the BigQuery schema path. "
                + "If this value isn't provided, then the schema is inferred from the Proto schema.",
        example = "gs://MyBucket/bq_schema.json")
    String getBigQueryTableSchemaPath();
    void setBigQueryTableSchemaPath(String value);
    @TemplateParameter.PubsubTopic(
        order = 5,
        optional = true,
        description = "Pub/Sub output topic for UDF failures",
        helpText =
            "The Pub/Sub topic storing the UDF errors."
                + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",
        example = "projects/your-project-id/topics/your-topic-name")
    String getUdfOutputTopic();
    void setUdfOutputTopic(String udfOutputTopic);
    // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
    // on when pipeline is running on ALO mode and using the Storage Write API
    @TemplateParameter.Boolean(
        order = 6,
        optional = true,
        parentName = "useStorageWriteApi",
        parentTriggerValues = {"true"},
        description = "Use at at-least-once semantics in BigQuery Storage Write API",
        helpText =
            "When using the Storage Write API, specifies the write semantics."
                + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."
                + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
        hiddenUi = true)
    @Default.Boolean(false)
    @Override
    Boolean getUseStorageWriteApiAtLeastOnce();
    void setUseStorageWriteApiAtLeastOnce(Boolean value);
  }
  /** Runs the pipeline and returns the results. */
  private static PipelineResult run(PubSubProtoToBigQueryOptions options) {
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    Pipeline pipeline = Pipeline.create(options);
    Descriptor descriptor = getDescriptor(options);
    PCollection<String> maybeForUdf =
        pipeline
            .apply("Read From Pubsub", readPubsubMessages(options, descriptor))
            .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));
    WriteResult writeResult =
        runUdf(maybeForUdf, options)
            .apply("Write to BigQuery", writeToBigQuery(options, descriptor));
    BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
        .apply(
            "Create Error Payload",
            ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()
                .setPayloadCoder(StringUtf8Coder.of())
                .setTranslateFunction(BigQueryConverters::tableRowToJson)
                .build())
        .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));
    return pipeline.run();
  }
  /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */
  @VisibleForTesting
  static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {
    String schemaPath = options.getProtoSchemaPath();
    String messageName = options.getFullMessageName();
    Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);
    if (descriptor == null) {
      throw new IllegalArgumentException(
          messageName + " is not a recognized message in " + schemaPath);
    }
    return descriptor;
  }
  /** Returns the {@link PTransform} for reading Pub/Sub messages. */
  private static Read<DynamicMessage> readPubsubMessages(
      PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
    return PubsubIO.readProtoDynamicMessages(descriptor)
        .fromSubscription(options.getInputSubscription())
        .withDeadLetterTopic(options.getOutputTopic());
  }
  /**
   * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.
   *
   * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is
   * specified in {@code options}.
   */
  @VisibleForTesting
  static Write<String> writeToBigQuery(
      PubSubProtoToBigQueryOptions options, Descriptor descriptor) {
    Write<String> write =
        BigQueryConverters.<String>createWriteTransform(options)
            .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    String schemaPath = options.getBigQueryTableSchemaPath();
    if (Strings.isNullOrEmpty(schemaPath)) {
      return write.withSchema(
          SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));
    } else {
      return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));
    }
  }
  /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */
  private static class ConvertDynamicProtoMessageToJson
      extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {
    private final boolean preserveProtoName;
    private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {
      this.preserveProtoName = options.getPreserveProtoFieldNames();
    }
    @Override
    public PCollection<String> expand(PCollection<DynamicMessage> input) {
      return input.apply(
          "Map to JSON",
          MapElements.into(TypeDescriptors.strings())
              .via(
                  message -> {
                    try {
                      JsonFormat.Printer printer = JsonFormat.printer();
                      return preserveProtoName
                          ? printer.preservingProtoFieldNames().print(message)
                          : printer.print(message);
                    } catch (InvalidProtocolBufferException e) {
                      throw new RuntimeException(e);
                    }
                  }));
    }
  }
  /**
   * Handles running the UDF.
   *
   * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.
   *
   * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed
   * topic.
   *
   * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF
   * @param options the options containing info on running the UDF
   * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not
   *     called
   */
  @VisibleForTesting
  static PCollection<String> runUdf(
      PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    // In order to avoid generating a graph that makes it look like a UDF was called when none was
    // intended, simply return the input as "success" output.
    if (!useJavascriptUdf && !usePythonUdf) {
      return jsonCollection;
    }
    // For testing purposes, we need to do this check before creating the PTransform rather than
    // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning
    // a value.
    if (useJavascriptUdf
        && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
      throw new IllegalArgumentException(
          "JavaScript function name cannot be null or empty if file is set");
    }
    if (usePythonUdf
        && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {
      throw new IllegalArgumentException(
          "Python function name cannot be null or empty if file is set");
    }
    if (usePythonUdf && useJavascriptUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }
    PCollectionTuple maybeSuccess;
    if (usePythonUdf) {
      maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));
    } else {
      maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));
    }
    maybeSuccess
        .get(UDF_FAILURE_TAG)
        .setCoder(FAILSAFE_CODER)
        .apply(
            "Get UDF Failures",
            ConvertFailsafeElementToPubsubMessage.<String, String>builder()
                .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))
                .setErrorMessageAttributeKey("udfErrorMessage")
                .build())
        .apply("Write Failed UDF", writeUdfFailures(options));
    return maybeSuccess
        .get(UDF_SUCCESS_TAG)
        .setCoder(FAILSAFE_CODER)
        .apply(
            "Get UDF Output",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .setCoder(NullableCoder.of(StringUtf8Coder.of()));
  }
  /** {@link PTransform} that calls a UDF and returns both success and failure output. */
  private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {
    private final PubSubProtoToBigQueryOptions options;
    RunUdf(PubSubProtoToBigQueryOptions options) {
      this.options = options;
    }
    @Override
    public PCollectionTuple expand(PCollection<String> input) {
      return input
          .apply("Prepare Failsafe UDF", makeFailsafe())
          .setCoder(FAILSAFE_CODER)
          .apply(
              "Call UDF",
              FailsafeJavascriptUdf.<String>newBuilder()
                  .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                  .setFunctionName(options.getJavascriptTextTransformFunctionName())
                  .setReloadIntervalMinutes(
                      options.getJavascriptTextTransformReloadIntervalMinutes())
                  .setSuccessTag(UDF_SUCCESS_TAG)
                  .setFailureTag(UDF_FAILURE_TAG)
                  .build());
    }
    private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {
      return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})
          .via((String json) -> FailsafeElement.of(json, json));
    }
  }
  /** {@link PTransform} that calls a python UDF and returns both success and failure output. */
  private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {
    private final PubSubProtoToBigQueryOptions options;
    RunPythonUdf(PubSubProtoToBigQueryOptions options) {
      this.options = options;
    }
    @Override
    public PCollectionTuple expand(PCollection<String> input) {
      return input
          .apply("Prepare Failsafe row", stringMappingFunction())
          .setCoder(
              RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA))
          .apply(
              "InvokeUDF",
              PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                  .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                  .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                  .build())
          .apply(
              "MapRowsToFailsafeElements",
              ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))
                  .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));
    }
  }
  /**
   * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output
   * topic.
   */
  private static PubsubIO.Write<PubsubMessage> writeUdfFailures(
      PubSubProtoToBigQueryOptions options) {
    PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();
    return Strings.isNullOrEmpty(options.getUdfOutputTopic())
        ? write.to(options.getOutputTopic())
        : write.to(options.getUdfOutputTopic());
  }
}