/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_CSV_to_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "CSV Files on Cloud Storage to BigQuery",
    description =
        "The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
            + "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
    optionsClass = Options.class,
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
          + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
          + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
          + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
          + "<pre class=\"prettyprint lang-json\">\n"
          + "{\n"
          + "  \"BigQuery Schema\": [\n"
          + "    {\n"
          + "      \"name\": \"location\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"name\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"age\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"color\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"coffee\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    }\n"
          + "  ]\n"
          + "}\n"
    })
public class CSVToBigQuery {
  /** Options supported by {@link CSVToBigQuery}. */
  public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
        regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
        example = "gs://your-bucket/path/*.csv")
    ValueProvider<String> getInputFilePattern();
    void setInputFilePattern(ValueProvider<String> value);
    @TemplateParameter.GcsReadFile(
        order = 2,
        groupName = "Target",
        description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
        helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
    ValueProvider<String> getSchemaJSONPath();
    void setSchemaJSONPath(ValueProvider<String> value);
    @TemplateParameter.BigQueryTable(
        order = 3,
        groupName = "Target",
        description = "BigQuery output table",
        helpText =
            "The name of the BigQuery table that stores your processed data. If you reuse an existing "
                + "BigQuery table, the data is appended to the destination table.")
    ValueProvider<String> getOutputTable();
    void setOutputTable(ValueProvider<String> value);
    @TemplateParameter.GcsWriteFolder(
        order = 4,
        description = "Temporary directory for BigQuery loading process",
        helpText = "The temporary directory to use during the BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp_dir")
    @Validation.Required
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();
    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);
    @TemplateParameter.BigQueryTable(
        order = 5,
        description = "BigQuery output table for bad records",
        helpText =
            "The name of the BigQuery table to use to store the rejected data when processing the"
                + " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
                + " destination table. The schema of this table must match the"
                + " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
    ValueProvider<String> getBadRecordsOutputTable();
    void setBadRecordsOutputTable(ValueProvider<String> value);
  }
  private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);
  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";
  /** The tag for the headers of the CSV if required. */
  private static final TupleTag<Iterable<String>> CSV_HEADERS = new TupleTag<Iterable<String>>() {};
  /** The tag for the lines of the CSV. */
  private static final TupleTag<Iterable<String>> CSV_LINES = new TupleTag<Iterable<String>>() {};
  /** The tag for the line of the CSV that matches destination table schema. */
  private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};
  /** The tag for the lines of the CSV that does not match destination table schema. */
  private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};
  /** The schema of the BigQuery table for the bad records. */
  private static final TableSchema errorTableSchema =
      new TableSchema()
          .setFields(
              ImmutableList.of(
                  new TableFieldSchema().setName("RawContent").setType("STRING"),
                  new TableFieldSchema().setName("ErrorMsg").setType("STRING")));
  private static class StringListToTableRowFn extends DoFn<Iterable<String>, TableRow> {
    private final ValueProvider<String> delimiter;
    private final NestedValueProvider<List<String>, String> fields;
    public StringListToTableRowFn(
        NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
      this.delimiter = delimiter;
      this.fields = schemaFields;
    }
    @ProcessElement
    public void processElement(ProcessContext context) throws IllegalArgumentException {
      TableRow outputTableRow = new TableRow();
      String[] rowValue = ImmutableList.copyOf(context.element()).toArray(new String[0]);
      if (rowValue.length != fields.get().size()) {
        LOG.error("Number of fields in the schema and number of Csv headers do not match.");
        outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
        outputTableRow.set(
            "ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
        context.output(BAD_RECORDS, outputTableRow);
      } else {
        for (int i = 0; i < fields.get().size(); ++i) {
          outputTableRow.set(fields.get().get(i), rowValue[i]);
        }
        context.output(GOOD_RECORDS, outputTableRow);
      }
    }
  }
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);
    PCollectionTuple tableRows =
        pipeline
            .apply(
                "ReadCsvFile",
                CsvConverters.ReadCsv.newBuilder()
                    .setInputFileSpec(options.getInputFilePattern())
                    .setHasHeaders(options.getContainsHeaders())
                    .setHeaderTag(CSV_HEADERS)
                    .setLineTag(CSV_LINES)
                    .setCsvFormat(options.getCsvFormat())
                    .setDelimiter(options.getDelimiter())
                    .setFileEncoding(options.getCsvFileEncoding())
                    .build())
            .get(CSV_LINES)
            .apply(
                "ConvertToTableRow",
                ParDo.of(
                        new StringListToTableRowFn(
                            NestedValueProvider.of(
                                options.getSchemaJSONPath(),
                                jsonPath -> {
                                  List<String> fields = new ArrayList<>();
                                  SchemaParser schemaParser = new SchemaParser();
                                  try {
                                    JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
                                    JSONArray bqSchemaJsonArray =
                                        jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
                                    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                                      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                                      fields.add(inputField.getString(NAME));
                                    }
                                  } catch (Exception e) {
                                    throw new RuntimeException(
                                        "Error parsing schema " + jsonPath, e);
                                  }
                                  return fields;
                                }),
                            options.getDelimiter()))
                    .withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));
    tableRows
        .get(GOOD_RECORDS)
        .apply(
            "Insert good records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(
                    NestedValueProvider.of(
                        options.getSchemaJSONPath(),
                        schemaPath -> {
                          TableSchema tableSchema = new TableSchema();
                          List<TableFieldSchema> fields = new ArrayList<>();
                          SchemaParser schemaParser = new SchemaParser();
                          try {
                            JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
                            JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
                            for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                              JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                              fields.add(convertToTableFieldSchema(inputField));
                            }
                            tableSchema.setFields(fields);
                          } catch (Exception e) {
                            throw new RuntimeException("Error parsing schema " + schemaPath, e);
                          }
                          return tableSchema;
                        }))
                .to(options.getOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
    tableRows
        .get(BAD_RECORDS)
        .apply(
            "Insert bad records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(errorTableSchema)
                .to(options.getBadRecordsOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
    pipeline.run();
  }
  /**
   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * the conversion recursively.
   *
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
   */
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()
            .setName(inputField.getString(NAME))
            .setType(inputField.getString(TYPE));
    if (inputField.has(MODE)) {
      field.setMode(inputField.getString(MODE));
    }
    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);
        nestedFields.add(convertToTableFieldSchema(nestedJSON));
      }
      field.setFields(nestedFields);
    }
    return field;
  }
}