Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Le pipeline de fichiers CSV Cloud Storage vers BigQuery est un pipeline par lots qui vous permet de lire des données à partir de fichiers CSV stockés dans Cloud Storage et d'ajouter le résultat à une table BigQuery.
Les fichiers CSV peuvent être non compressés ou compressés dans les formats répertoriés sur la page du SDK Enum Compression.
Conditions requises pour ce pipeline
Pour utiliser ce modèle, votre pipeline doit répondre aux exigences suivantes.
Fichier JSON de schéma BigQuery
Créez un fichier JSON décrivant votre schéma BigQuery.
Assurez-vous que le schéma comporte un tableau JSON de niveau supérieur intitulé BigQuery Schema et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.
Le modèle de traitement par lots des fichiers CSV Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs STRUCT (Enregistrement) de la table BigQuery cible.
Le code JSON suivant décrit un exemple de schéma BigQuery :
inputFilePattern : chemin d'accès Cloud Storage au fichier CSV contenant le texte à traiter. (Exemple: gs://your-bucket/path/*.csv).
schemaJSONPath : chemin d'accès Cloud Storage au fichier JSON qui définit votre schéma BigQuery.
outputTable : nom de la table BigQuery qui stocke vos données traitées. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination.
bigQueryLoadingTemporaryDirectory : répertoire temporaire à utiliser lors du processus de chargement de BigQuery. (par exemple, gs://your-bucket/your-files/temp_dir).
delimiter : délimiteur de colonne utilisé par le fichier CSV. (Exemple: ,).
csvFormat : format CSV conforme au format CSV Apache Commons. La valeur par défaut est Default.
Paramètres facultatifs
containsHeaders : indique si les en-têtes sont inclus dans le fichier CSV. La valeur par défaut est "false".
csvFileEncoding : format d'encodage des caractères du fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF8.
Exécuter le modèle
Console
Accédez à la page Dataflow Créer un job à partir d'un modèle.
Le nom de la version, par exemple 2023-09-12-00_RC00, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
DELIMITER : délimiteur de fichier CSV
CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
CSV_FILE_ENCODING : encodage dans les fichiers CSV
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.
Le nom de la version, par exemple 2023-09-12-00_RC00, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
DELIMITER : délimiteur de fichier CSV
CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
CSV_FILE_ENCODING : encodage dans les fichiers CSV
Code source du modèle
Java
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "GCS_CSV_to_BigQuery",
category = TemplateCategory.BATCH,
displayName = "CSV Files on Cloud Storage to BigQuery",
description =
"The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
+ "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support",
requirements = {
"Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
+ " <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
+ " contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
+ " <p>The following JSON describes an example BigQuery schema:</p>\n"
+ "<pre class=\"prettyprint lang-json\">\n"
+ "{\n"
+ " \"BigQuery Schema\": [\n"
+ " {\n"
+ " \"name\": \"location\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"age\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"color\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"coffee\",\n"
+ " \"type\": \"STRING\"\n"
+ " }\n"
+ " ]\n"
+ "}\n"
})
public class CSVToBigQuery {
/** Options supported by {@link CSVToBigQuery}. */
public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {
@TemplateParameter.Text(
order = 1,
groupName = "Source",
description = "Cloud Storage Input File(s)",
helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
example = "gs://your-bucket/path/*.csv")
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@TemplateParameter.GcsReadFile(
order = 2,
groupName = "Target",
description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
ValueProvider<String> getSchemaJSONPath();
void setSchemaJSONPath(ValueProvider<String> value);
@TemplateParameter.BigQueryTable(
order = 3,
groupName = "Target",
description = "BigQuery output table",
helpText =
"The name of the BigQuery table that stores your processed data. If you reuse an existing "
+ "BigQuery table, the data is appended to the destination table.")
ValueProvider<String> getOutputTable();
void setOutputTable(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Temporary directory for BigQuery loading process",
helpText = "The temporary directory to use during the BigQuery loading process.",
example = "gs://your-bucket/your-files/temp_dir")
@Validation.Required
ValueProvider<String> getBigQueryLoadingTemporaryDirectory();
void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);
@TemplateParameter.BigQueryTable(
order = 5,
description = "BigQuery output table for bad records",
helpText =
"The name of the BigQuery table to use to store the rejected data when processing the"
+ " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
+ " destination table. The schema of this table must match the"
+ " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
ValueProvider<String> getBadRecordsOutputTable();
void setBadRecordsOutputTable(ValueProvider<String> value);
}
private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);
private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
private static final String NAME = "name";
private static final String TYPE = "type";
private static final String MODE = "mode";
private static final String RECORD_TYPE = "RECORD";
private static final String FIELDS_ENTRY = "fields";
/** The tag for the headers of the CSV if required. */
private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};
/** The tag for the lines of the CSV. */
private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};
/** The tag for the line of the CSV that matches destination table schema. */
private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};
/** The tag for the lines of the CSV that does not match destination table schema. */
private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};
/** The schema of the BigQuery table for the bad records. */
private static final TableSchema errorTableSchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("RawContent").setType("STRING"),
new TableFieldSchema().setName("ErrorMsg").setType("STRING")));
private static class StringToTableRowFn extends DoFn<String, TableRow> {
private final ValueProvider<String> delimiter;
private final NestedValueProvider<List<String>, String> fields;
public StringToTableRowFn(
NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
this.delimiter = delimiter;
this.fields = schemaFields;
}
@ProcessElement
public void processElement(ProcessContext context) throws IllegalArgumentException {
TableRow outputTableRow = new TableRow();
String[] rowValue =
Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
if (rowValue.length != fields.get().size()) {
LOG.error("Number of fields in the schema and number of Csv headers do not match.");
outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
outputTableRow.set(
"ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
context.output(BAD_RECORDS, outputTableRow);
} else {
for (int i = 0; i < fields.get().size(); ++i) {
outputTableRow.set(fields.get().get(i), rowValue[i]);
}
context.output(GOOD_RECORDS, outputTableRow);
}
}
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
PCollectionTuple tableRows =
pipeline
.apply(
"ReadCsvFile",
CsvConverters.ReadCsv.newBuilder()
.setInputFileSpec(options.getInputFilePattern())
.setHasHeaders(options.getContainsHeaders())
.setHeaderTag(CSV_HEADERS)
.setLineTag(CSV_LINES)
.setCsvFormat(options.getCsvFormat())
.setDelimiter(options.getDelimiter())
.setFileEncoding(options.getCsvFileEncoding())
.build())
.get(CSV_LINES)
.apply(
"ConvertToTableRow",
ParDo.of(
new StringToTableRowFn(
NestedValueProvider.of(
options.getSchemaJSONPath(),
jsonPath -> {
List<String> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
try {
JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
JSONArray bqSchemaJsonArray =
jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
fields.add(inputField.getString(NAME));
}
} catch (Exception e) {
throw new RuntimeException(
"Error parsing schema " + jsonPath, e);
}
return fields;
}),
options.getDelimiter()))
.withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));
tableRows
.get(GOOD_RECORDS)
.apply(
"Insert good records into Bigquery",
BigQueryIO.writeTableRows()
.withSchema(
NestedValueProvider.of(
options.getSchemaJSONPath(),
schemaPath -> {
TableSchema tableSchema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
try {
JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
fields.add(convertToTableFieldSchema(inputField));
}
tableSchema.setFields(fields);
} catch (Exception e) {
throw new RuntimeException("Error parsing schema " + schemaPath, e);
}
return tableSchema;
}))
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
tableRows
.get(BAD_RECORDS)
.apply(
"Insert bad records into Bigquery",
BigQueryIO.writeTableRows()
.withSchema(errorTableSchema)
.to(options.getBadRecordsOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
pipeline.run();
}
/**
* Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
* the conversion recursively.
*
* @param inputField Input field to convert.
* @return TableFieldSchema instance to populate the schema.
*/
private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
TableFieldSchema field =
new TableFieldSchema()
.setName(inputField.getString(NAME))
.setType(inputField.getString(TYPE));
if (inputField.has(MODE)) {
field.setMode(inputField.getString(MODE));
}
if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
List<TableFieldSchema> nestedFields = new ArrayList<>();
JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
for (int i = 0; i < fieldsArr.length(); i++) {
JSONObject nestedJSON = fieldsArr.getJSONObject(i);
nestedFields.add(convertToTableFieldSchema(nestedJSON));
}
field.setFields(nestedFields);
}
return field;
}
}
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2024/08/02 (UTC).
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],["Dernière mise à jour le 2024/08/02 (UTC)."],[],[]]