Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Pipeline "Cloud Storage-CSV-Dateien für BigQuery" ist eine Batchpipeline, mit der Sie Daten aus in Cloud Storage gespeicherten CSV-Dateien lesen und das Ergebnis an eine BigQuery-Tabelle anhängen können.
Die CSV-Dateien können unkomprimiert oder komprimiert sein und in einem Format vorliegen, das auf der Seite zum Compression Enum SDK aufgeführt ist.
Pipelineanforderungen
Um diese Vorlage verwenden zu können, muss Ihre Pipeline die folgenden Anforderungen erfüllen.
BigQuery-JSON-Schemadatei
Erstellen Sie eine JSON-Datei, die Ihr BigQuery-Schema beschreibt.
Sorgen Sie dafür, dass das Schema ein JSON-Array der obersten Ebene mit dem Namen BigQuery Schema hat und sein Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt.
Die Batchvorlage "Cloud Storage-CSV-Dateien für BigQuery" unterstützt nicht den Import von Daten in Felder des Typs STRUCT (Eintrag) in der BigQuery-Zieltabelle.
Der folgende JSON-Code beschreibt ein BigQuery-Beispielschema:
inputFilePattern : Der Cloud Storage-Pfad zur CSV-Datei, die den zu verarbeitenden Text enthält. Beispiel: gs://Ihr-Bucket/Pfad/*.csv.
schemaJSONPath : Der Cloud Storage-Pfad zur JSON-Datei, die Ihr BigQuery-Schema definiert.
outputTable : Der Name der BigQuery-Tabelle, in der die verarbeiteten Daten gespeichert sind. Wenn Sie eine vorhandene BigQuery-Tabelle wiederverwenden, werden die Daten an die Zieltabelle angehängt.
bigQueryLoadingTemporaryDirectory : Das temporäre Verzeichnis, das beim Laden von BigQuery verwendet wird. Beispiel: gs://Ihr-Bucket/Ihre-Dateien/temp_dir.
Trennzeichen : Das in der CSV-Datei verwendete Spaltentrennzeichen. (Beispiel: ,).
csvFormat : Das CSV-Format gemäß dem Apache Commons CSV-Format. Die Standardeinstellung ist: Standard.
Optionale Parameter
containsHeaders : Gibt an, ob Header in der CSV-Datei enthalten sind. Die Standardeinstellung ist "false".
csvFileEncoding : Das Zeichencodierungsformat für die CSV-Datei. Zulässige Werte sind US-ASCII, ISO-8859-1, UTF-8 und UTF-16. Ist standardmäßig auf UTF-8 eingestellt.
Führen Sie die Vorlage aus.
Console
Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
PATH_TO_CSV_DATA: der Cloud Storage-Pfad zu Ihren CSV-Dateien
PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
BIGQUERY_DESTINATION_TABLE: der Name der BigQuery-Zieltabelle
BIGQUERY_BAD_RECORDS_TABLE: Name der BigQuery-Tabelle mit fehlerhaften Einträgen
PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
DELIMITER: Trennzeichen für CSV-Datei
CSV_FORMAT: CSV-Formatspezifikation zum Parsen von Datensätzen
CONTAINS_HEADERS: Gibt an, ob die CSV-Dateien Header enthalten
CSV_FILE_ENCODING: Codierung in den CSV-Dateien
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
PATH_TO_CSV_DATA: der Cloud Storage-Pfad zu Ihren CSV-Dateien
PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
BIGQUERY_DESTINATION_TABLE: der Name der BigQuery-Zieltabelle
BIGQUERY_BAD_RECORDS_TABLE: Name der BigQuery-Tabelle mit fehlerhaften Einträgen
PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
DELIMITER: Trennzeichen für CSV-Datei
CSV_FORMAT: CSV-Formatspezifikation zum Parsen von Datensätzen
CONTAINS_HEADERS: Gibt an, ob die CSV-Dateien Header enthalten
CSV_FILE_ENCODING: Codierung in den CSV-Dateien
Quellcode der Vorlage
Java
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "GCS_CSV_to_BigQuery",
category = TemplateCategory.BATCH,
displayName = "CSV Files on Cloud Storage to BigQuery",
description =
"The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
+ "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support",
requirements = {
"Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
+ " <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
+ " contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
+ " <p>The following JSON describes an example BigQuery schema:</p>\n"
+ "<pre class=\"prettyprint lang-json\">\n"
+ "{\n"
+ " \"BigQuery Schema\": [\n"
+ " {\n"
+ " \"name\": \"location\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"age\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"color\",\n"
+ " \"type\": \"STRING\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"coffee\",\n"
+ " \"type\": \"STRING\"\n"
+ " }\n"
+ " ]\n"
+ "}\n"
})
public class CSVToBigQuery {
/** Options supported by {@link CSVToBigQuery}. */
public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {
@TemplateParameter.Text(
order = 1,
groupName = "Source",
description = "Cloud Storage Input File(s)",
helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
example = "gs://your-bucket/path/*.csv")
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@TemplateParameter.GcsReadFile(
order = 2,
groupName = "Target",
description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
ValueProvider<String> getSchemaJSONPath();
void setSchemaJSONPath(ValueProvider<String> value);
@TemplateParameter.BigQueryTable(
order = 3,
groupName = "Target",
description = "BigQuery output table",
helpText =
"The name of the BigQuery table that stores your processed data. If you reuse an existing "
+ "BigQuery table, the data is appended to the destination table.")
ValueProvider<String> getOutputTable();
void setOutputTable(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Temporary directory for BigQuery loading process",
helpText = "The temporary directory to use during the BigQuery loading process.",
example = "gs://your-bucket/your-files/temp_dir")
@Validation.Required
ValueProvider<String> getBigQueryLoadingTemporaryDirectory();
void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);
@TemplateParameter.BigQueryTable(
order = 5,
description = "BigQuery output table for bad records",
helpText =
"The name of the BigQuery table to use to store the rejected data when processing the"
+ " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
+ " destination table. The schema of this table must match the"
+ " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
ValueProvider<String> getBadRecordsOutputTable();
void setBadRecordsOutputTable(ValueProvider<String> value);
}
private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);
private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
private static final String NAME = "name";
private static final String TYPE = "type";
private static final String MODE = "mode";
private static final String RECORD_TYPE = "RECORD";
private static final String FIELDS_ENTRY = "fields";
/** The tag for the headers of the CSV if required. */
private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};
/** The tag for the lines of the CSV. */
private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};
/** The tag for the line of the CSV that matches destination table schema. */
private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};
/** The tag for the lines of the CSV that does not match destination table schema. */
private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};
/** The schema of the BigQuery table for the bad records. */
private static final TableSchema errorTableSchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("RawContent").setType("STRING"),
new TableFieldSchema().setName("ErrorMsg").setType("STRING")));
private static class StringToTableRowFn extends DoFn<String, TableRow> {
private final ValueProvider<String> delimiter;
private final NestedValueProvider<List<String>, String> fields;
public StringToTableRowFn(
NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
this.delimiter = delimiter;
this.fields = schemaFields;
}
@ProcessElement
public void processElement(ProcessContext context) throws IllegalArgumentException {
TableRow outputTableRow = new TableRow();
String[] rowValue =
Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
if (rowValue.length != fields.get().size()) {
LOG.error("Number of fields in the schema and number of Csv headers do not match.");
outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
outputTableRow.set(
"ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
context.output(BAD_RECORDS, outputTableRow);
} else {
for (int i = 0; i < fields.get().size(); ++i) {
outputTableRow.set(fields.get().get(i), rowValue[i]);
}
context.output(GOOD_RECORDS, outputTableRow);
}
}
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
PCollectionTuple tableRows =
pipeline
.apply(
"ReadCsvFile",
CsvConverters.ReadCsv.newBuilder()
.setInputFileSpec(options.getInputFilePattern())
.setHasHeaders(options.getContainsHeaders())
.setHeaderTag(CSV_HEADERS)
.setLineTag(CSV_LINES)
.setCsvFormat(options.getCsvFormat())
.setDelimiter(options.getDelimiter())
.setFileEncoding(options.getCsvFileEncoding())
.build())
.get(CSV_LINES)
.apply(
"ConvertToTableRow",
ParDo.of(
new StringToTableRowFn(
NestedValueProvider.of(
options.getSchemaJSONPath(),
jsonPath -> {
List<String> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
try {
JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
JSONArray bqSchemaJsonArray =
jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
fields.add(inputField.getString(NAME));
}
} catch (Exception e) {
throw new RuntimeException(
"Error parsing schema " + jsonPath, e);
}
return fields;
}),
options.getDelimiter()))
.withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));
tableRows
.get(GOOD_RECORDS)
.apply(
"Insert good records into Bigquery",
BigQueryIO.writeTableRows()
.withSchema(
NestedValueProvider.of(
options.getSchemaJSONPath(),
schemaPath -> {
TableSchema tableSchema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
try {
JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);
for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
fields.add(convertToTableFieldSchema(inputField));
}
tableSchema.setFields(fields);
} catch (Exception e) {
throw new RuntimeException("Error parsing schema " + schemaPath, e);
}
return tableSchema;
}))
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
tableRows
.get(BAD_RECORDS)
.apply(
"Insert bad records into Bigquery",
BigQueryIO.writeTableRows()
.withSchema(errorTableSchema)
.to(options.getBadRecordsOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));
pipeline.run();
}
/**
* Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
* the conversion recursively.
*
* @param inputField Input field to convert.
* @return TableFieldSchema instance to populate the schema.
*/
private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
TableFieldSchema field =
new TableFieldSchema()
.setName(inputField.getString(NAME))
.setType(inputField.getString(TYPE));
if (inputField.has(MODE)) {
field.setMode(inputField.getString(MODE));
}
if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
List<TableFieldSchema> nestedFields = new ArrayList<>();
JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
for (int i = 0; i < fieldsArr.length(); i++) {
JSONObject nestedJSON = fieldsArr.getJSONObject(i);
nestedFields.add(convertToTableFieldSchema(nestedJSON));
}
field.setFields(nestedFields);
}
return field;
}
}
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2024-07-11 (UTC)."],[],[]]