Fichiers CSV Cloud Storage vers BigQuery

Le pipeline "Fichiers CSV Cloud Storage vers BigQuery" est un pipeline par lots qui vous permet de lire les données des fichiers CSV stockés dans Cloud Storage et d'ajouter le résultat à une table BigQuery. Les fichiers CSV peuvent être non compressés ou compressés dans les formats listés sur la page Compression Enum SDK.

Conditions requises pour ce pipeline

Pour utiliser ce modèle, votre pipeline doit répondre aux exigences suivantes.

Fichier JSON du schéma BigQuery

Créez un fichier JSON décrivant votre schéma BigQuery. Assurez-vous que le schéma comporte un tableau JSON de niveau supérieur intitulé BigQuery Schema et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Le modèle de traitement par lots des fichiers CSV Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs STRUCT (Enregistrement) de la table BigQuery cible.

Le code JSON suivant décrit un exemple de schéma BigQuery :

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Schéma de la table d'erreurs

La table BigQuery qui stocke les enregistrements rejetés à partir de fichiers CSV doit correspondre au schéma de table défini ici.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Paramètres de modèle

Paramètres obligatoires

  • inputFilePattern: chemin d'accès Cloud Storage au fichier CSV contenant le texte à traiter. Exemple :gs://your-bucket/path/*.csv
  • schemaJSONPath: chemin d'accès Cloud Storage au fichier JSON qui définit votre schéma BigQuery.
  • outputTable: nom de la table BigQuery qui stocke vos données traitées. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination.
  • bigQueryLoadingTemporaryDirectory: répertoire temporaire à utiliser pendant le processus de chargement de BigQuery. Exemple :gs://your-bucket/your-files/temp_dir
  • badRecordsOutputTable: nom de la table BigQuery à utiliser pour stocker les données rejetées lors du traitement des fichiers CSV. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination. Le schéma de cette table doit correspondre à celui de la table des erreurs (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • delimiter: séparateur de colonne utilisé par le fichier CSV. Exemple :,
  • csvFormat: format CSV conforme au format CSV Apache Commons. La valeur par défaut est Default.

Paramètres facultatifs

  • containsHeaders: indique si des en-têtes sont inclus dans le fichier CSV. La valeur par défaut est false.
  • csvFileEncoding: format d'encodage des caractères du fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF8.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_CSV_to_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "CSV Files on Cloud Storage to BigQuery",
    description =
        "The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
            + "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
    optionsClass = Options.class,
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
          + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
          + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
          + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
          + "<pre class=\"prettyprint lang-json\">\n"
          + "{\n"
          + "  \"BigQuery Schema\": [\n"
          + "    {\n"
          + "      \"name\": \"location\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"name\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"age\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"color\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"coffee\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    }\n"
          + "  ]\n"
          + "}\n"
    })
public class CSVToBigQuery {

  /** Options supported by {@link CSVToBigQuery}. */
  public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {

    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
        regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
        example = "gs://your-bucket/path/*.csv")
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

    @TemplateParameter.GcsReadFile(
        order = 2,
        groupName = "Target",
        description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
        helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
    ValueProvider<String> getSchemaJSONPath();

    void setSchemaJSONPath(ValueProvider<String> value);

    @TemplateParameter.BigQueryTable(
        order = 3,
        groupName = "Target",
        description = "BigQuery output table",
        helpText =
            "The name of the BigQuery table that stores your processed data. If you reuse an existing "
                + "BigQuery table, the data is appended to the destination table.")
    ValueProvider<String> getOutputTable();

    void setOutputTable(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        description = "Temporary directory for BigQuery loading process",
        helpText = "The temporary directory to use during the BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp_dir")
    @Validation.Required
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);

    @TemplateParameter.BigQueryTable(
        order = 5,
        description = "BigQuery output table for bad records",
        helpText =
            "The name of the BigQuery table to use to store the rejected data when processing the"
                + " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
                + " destination table. The schema of this table must match the"
                + " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
    ValueProvider<String> getBadRecordsOutputTable();

    void setBadRecordsOutputTable(ValueProvider<String> value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  /** The tag for the headers of the CSV if required. */
  private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the line of the CSV that matches destination table schema. */
  private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};

  /** The tag for the lines of the CSV that does not match destination table schema. */
  private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};

  /** The schema of the BigQuery table for the bad records. */
  private static final TableSchema errorTableSchema =
      new TableSchema()
          .setFields(
              ImmutableList.of(
                  new TableFieldSchema().setName("RawContent").setType("STRING"),
                  new TableFieldSchema().setName("ErrorMsg").setType("STRING")));

  private static class StringToTableRowFn extends DoFn<String, TableRow> {
    private final ValueProvider<String> delimiter;
    private final NestedValueProvider<List<String>, String> fields;

    public StringToTableRowFn(
        NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
      this.delimiter = delimiter;
      this.fields = schemaFields;
    }

    @ProcessElement
    public void processElement(ProcessContext context) throws IllegalArgumentException {
      TableRow outputTableRow = new TableRow();
      String[] rowValue =
          Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
      if (rowValue.length != fields.get().size()) {
        LOG.error("Number of fields in the schema and number of Csv headers do not match.");
        outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
        outputTableRow.set(
            "ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
        context.output(BAD_RECORDS, outputTableRow);
      } else {
        for (int i = 0; i < fields.get().size(); ++i) {
          outputTableRow.set(fields.get().get(i), rowValue[i]);
        }
        context.output(GOOD_RECORDS, outputTableRow);
      }
    }
  }

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    PCollectionTuple tableRows =
        pipeline
            .apply(
                "ReadCsvFile",
                CsvConverters.ReadCsv.newBuilder()
                    .setInputFileSpec(options.getInputFilePattern())
                    .setHasHeaders(options.getContainsHeaders())
                    .setHeaderTag(CSV_HEADERS)
                    .setLineTag(CSV_LINES)
                    .setCsvFormat(options.getCsvFormat())
                    .setDelimiter(options.getDelimiter())
                    .setFileEncoding(options.getCsvFileEncoding())
                    .build())
            .get(CSV_LINES)
            .apply(
                "ConvertToTableRow",
                ParDo.of(
                        new StringToTableRowFn(
                            NestedValueProvider.of(
                                options.getSchemaJSONPath(),
                                jsonPath -> {
                                  List<String> fields = new ArrayList<>();
                                  SchemaParser schemaParser = new SchemaParser();

                                  try {
                                    JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
                                    JSONArray bqSchemaJsonArray =
                                        jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                                    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                                      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                                      fields.add(inputField.getString(NAME));
                                    }

                                  } catch (Exception e) {
                                    throw new RuntimeException(
                                        "Error parsing schema " + jsonPath, e);
                                  }
                                  return fields;
                                }),
                            options.getDelimiter()))
                    .withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));

    tableRows
        .get(GOOD_RECORDS)
        .apply(
            "Insert good records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(
                    NestedValueProvider.of(
                        options.getSchemaJSONPath(),
                        schemaPath -> {
                          TableSchema tableSchema = new TableSchema();
                          List<TableFieldSchema> fields = new ArrayList<>();
                          SchemaParser schemaParser = new SchemaParser();

                          try {
                            JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
                            JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                            for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                              JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
                              fields.add(convertToTableFieldSchema(inputField));
                            }
                            tableSchema.setFields(fields);

                          } catch (Exception e) {
                            throw new RuntimeException("Error parsing schema " + schemaPath, e);
                          }
                          return tableSchema;
                        }))
                .to(options.getOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));

    tableRows
        .get(BAD_RECORDS)
        .apply(
            "Insert bad records into Bigquery",
            BigQueryIO.writeTableRows()
                .withSchema(errorTableSchema)
                .to(options.getBadRecordsOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));

    pipeline.run();
  }

  /**
   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * the conversion recursively.
   *
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
   */
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()
            .setName(inputField.getString(NAME))
            .setType(inputField.getString(TYPE));

    if (inputField.has(MODE)) {
      field.setMode(inputField.getString(MODE));
    }

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);
        nestedFields.add(convertToTableFieldSchema(nestedJSON));
      }
      field.setFields(nestedFields);
    }

    return field;
  }
}

Étape suivante