File CSV Cloud Storage ke template BigQuery

File CSV Cloud Storage ke pipeline BigQuery adalah pipeline batch yang memungkinkan Anda membaca data dari file CSV yang disimpan di Cloud Storage dan menambahkan hasilnya ke tabel BigQuery. File CSV dapat dikompresi atau tidak dikompresi dalam format yang tercantum di halaman Compression Enum SDK.

Persyaratan pipeline

Untuk menggunakan template ini, pipeline Anda harus memenuhi persyaratan berikut.

File JSON skema BigQuery

Buat file JSON yang mendeskripsikan skema BigQuery Anda. Pastikan skema memiliki array JSON level teratas yang berjudul BigQuery Schema dan isinya mengikuti pola {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Template batch file CSV Cloud Storage ke BigQuery tidak mendukung impor data ke kolom STRUCT (Record) di tabel BigQuery target.

JSON berikut menjelaskan contoh skema BigQuery:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Skema tabel error

Tabel BigQuery yang menyimpan data yang ditolak dari file CSV harus cocok dengan skema tabel yang ditentukan di sini.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parameter template

Parameter yang diperlukan

  • inputFilePattern: Jalur Cloud Storage ke file CSV yang berisi teks yang akan diproses. Contoh, gs://your-bucket/path/*.csv.
  • schemaJSONPath: Jalur Cloud Storage ke file JSON yang menentukan skema BigQuery Anda.
  • outputTable: Nama tabel BigQuery yang menyimpan data yang diproses. Jika Anda menggunakan kembali tabel BigQuery yang sudah ada, data akan ditambahkan ke tabel tujuan.
  • bigQueryLoadingTemporaryDirectory: Direktori sementara yang akan digunakan selama proses pemuatan BigQuery. Contoh, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: Nama tabel BigQuery yang akan digunakan untuk menyimpan data yang ditolak saat memproses file CSV. Jika Anda menggunakan kembali tabel BigQuery yang sudah ada, data akan ditambahkan ke tabel tujuan. Skema tabel ini harus cocok dengan skema tabel error (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • pemisah: Pemisah kolom yang digunakan file CSV. Contoh, ,.
  • csvFormat: Format CSV sesuai dengan format CSV Apache Commons. Default-nya adalah: Default.

Parameter opsional

  • containsHeaders: Apakah header disertakan dalam file CSV. Default-nya adalah: false.
  • csvFileEncoding: Format encoding karakter file CSV. Nilai yang Diizinkan adalah US-ASCII, ISO-8859-1, UTF-8, dan UTF-16. Setelan defaultnya adalah: UTF-8.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • PATH_TO_CSV_DATA: jalur Cloud Storage ke file CSV Anda
  • PATH_TO_BIGQUERY_SCHEMA_JSON: jalur Cloud Storage ke file JSON yang berisi definisi skema
  • BIGQUERY_DESTINATION_TABLE: nama tabel tujuan BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: nama tabel data buruk BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage ke direktori sementara
  • DELIMITER: Pembatas file CSV
  • CSV_FORMAT: Spesifikasi format CSV untuk mengurai data
  • CONTAINS_HEADERS: apakah file CSV berisi header
  • CSV_FILE_ENCODING: encoding dalam file CSV

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • PATH_TO_CSV_DATA: jalur Cloud Storage ke file CSV Anda
  • PATH_TO_BIGQUERY_SCHEMA_JSON: jalur Cloud Storage ke file JSON yang berisi definisi skema
  • BIGQUERY_DESTINATION_TABLE: nama tabel tujuan BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: nama tabel data buruk BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage ke direktori sementara
  • DELIMITER: Pembatas file CSV
  • CSV_FORMAT: Spesifikasi format CSV untuk mengurai data
  • CONTAINS_HEADERS: apakah file CSV berisi header
  • CSV_FILE_ENCODING: encoding dalam file CSV
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_CSV_to_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "CSV Files on Cloud Storage to BigQuery",
    description =
        "The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
            + "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
    optionsClass = Options.class,
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
          + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
          + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
          + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
          + "<pre class=\"prettyprint lang-json\">\n"
          + "{\n"
          + "  \"BigQuery Schema\": [\n"
          + "    {\n"
          + "      \"name\": \"location\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"name\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"age\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"color\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"coffee\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    }\n"
          + "  ]\n"
          + "}\n"
    })
public class CSVToBigQuery {

  /** Options supported by {@link CSVToBigQuery}. */
  public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {

    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
        regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
        example = "gs://your-bucket/path/*.csv")
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

    @TemplateParameter.GcsReadFile(
        order = 2,
        groupName = "Target",
        description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
        helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
    ValueProvider<String> getSchemaJSONPath();

    void setSchemaJSONPath(ValueProvider<String> value);

    @TemplateParameter.BigQueryTable(
        order = 3,
        groupName = "Target",
        description = "BigQuery output table",
        helpText =
            "The name of the BigQuery table that stores your processed data. If you reuse an existing "
                + "BigQuery table, the data is appended to the destination table.")
    ValueProvider<String> getOutputTable();

    void setOutputTable(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        description = "Temporary directory for BigQuery loading process",
        helpText = "The temporary directory to use during the BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp_dir")
    @Validation.Required
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);

    @TemplateParameter.BigQueryTable(
        order = 5,
        description = "BigQuery output table for bad records",
        helpText =
            "The name of the BigQuery table to use to store the rejected data when processing the"
                + " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
                + " destination table. The schema of this table must match the"
                + " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
    ValueProvider<String> getBadRecordsOutputTable();

    void setBadRecordsOutputTable(ValueProvider<String> value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  /** The tag for the headers of the CSV if required. */
  private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the line of the CSV that matches destination table schema. */
  private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};

  /** The tag for the lines of the CSV that does not match destination table schema. */
  private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};

  /** The schema of the BigQuery table for the bad records. */
  private static final TableSchema errorTableSchema =
      new TableSchema()
          .setFields(
              ImmutableList.of(
                  new TableFieldSchema().setName("RawContent").setType("STRING"),
                  new TableFieldSchema().setName("ErrorMsg").setType("STRING")));

  private static class StringToTableRowFn extends DoFn<String, TableRow> {
    private final ValueProvider<String> delimiter;
    private final NestedValueProvider<List<String>, String> fields;

    public StringToTableRowFn(
        NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
      this.delimiter = delimiter;
      this.fields = schemaFields;
    }

    @ProcessElement
    public void processElement(ProcessContext context) throws IllegalArgumentException {
      TableRow outputTableRow = new TableRow();
      String[] rowValue =
          Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
      if (rowValue.length != fields.get().size()) {
        LOG.error("Number of fields in the schema and number of Csv headers do not match.");
        outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
        outputTableRow.set(
            "ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
        context.output(BAD_RECORDS, outputTableRow);
      } else {
        for (int i = 0; i < fields.get().size(); ++i) {
          outputTableRow.set(fields.get().get(i), rowValue[i]);
        }
        context.output(GOOD_RECORDS, outputTableRow);
      }
    }
  }

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    PCollectionTuple tableRows =
        pipeline
            .apply(
                "ReadCsvFile",
                CsvConverters.ReadCsv.newBuilder()
                    .setInputFileSpec(options.getInputFilePattern())
                    .setHasHeaders(options.getContainsHeaders())
                    .setHeaderTag(CSV_HEADERS)
                    .setLineTag(CSV_LINES)
                    .setCsvFormat(options.getCsvFormat())
                    .setDelimiter(options.getDelimiter())
                    .setFileEncoding(options.getCsvFileEncoding())
                    .build())
            .get(CSV_LINES)
            .apply(
                "ConvertToTableRow",
                ParDo.of(
                        new StringToTableRowFn(
                            NestedValueProvider.of(
                                options.getSchemaJSONPath(),
                                jsonPath -> {
                                  List<String> fields = new ArrayList<>();
                                  SchemaParser schemaParser = new SchemaParser();

                                  try {
                                    JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
                                    JSONArray bqSchemaJsonArray =
                                        jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                                    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                                      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                                      fields.add(inputField.getString(NAME));
                                    }

                                  } catch (Exception e) {
                                    throw new RuntimeException(
                                        "Error parsing schema " + jsonPath, e);
                                  }
                                  return fields;
                                }),
                            options.getDelimiter()))
                    .withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));

    tableRows
        .get(GOOD_RECORDS)
        .apply(
            "Insert good records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(
                    NestedValueProvider.of(
                        options.getSchemaJSONPath(),
                        schemaPath -> {
                          TableSchema tableSchema = new TableSchema();
                          List<TableFieldSchema> fields = new ArrayList<>();
                          SchemaParser schemaParser = new SchemaParser();

                          try {
                            JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
                            JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                            for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                              JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                              fields.add(convertToTableFieldSchema(inputField));
                            }
                            tableSchema.setFields(fields);

                          } catch (Exception e) {
                            throw new RuntimeException("Error parsing schema " + schemaPath, e);
                          }
                          return tableSchema;
                        }))
                .to(options.getOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));

    tableRows
        .get(BAD_RECORDS)
        .apply(
            "Insert bad records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(errorTableSchema)
                .to(options.getBadRecordsOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));

    pipeline.run();
  }

  /**
   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * the conversion recursively.
   *
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
   */
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()
            .setName(inputField.getString(NAME))
            .setType(inputField.getString(TYPE));

    if (inputField.has(MODE)) {
      field.setMode(inputField.getString(MODE));
    }

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);
        nestedFields.add(convertToTableFieldSchema(nestedJSON));
      }
      field.setFields(nestedFields);
    }

    return field;
  }
}

Langkah berikutnya