Vorlage „Cloud Storage-CSV-Dateien für BigQuery“

Die Pipeline „Cloud Storage-CSV-Dateien für BigQuery“ ist eine Batchpipeline, mit der Sie Daten aus in Cloud Storage gespeicherten CSV-Dateien lesen und das Ergebnis an eine BigQuery-Tabelle anhängen können. Die CSV-Dateien können unkomprimiert oder in einem der Formate komprimiert sein, die auf der Seite CompressionEnum SDK aufgeführt sind.


Damit Sie diese Vorlage verwenden können, muss Ihre Pipeline die folgenden Anforderungen erfüllen.


Erstellen Sie eine JSON-Datei, die Ihr BigQuery-Schema beschreibt. Das Schema muss ein JSON-Array der obersten Ebene mit dem Namen BigQuery Schema enthalten, dessen Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt.

Die Batchvorlage "Cloud Storage-CSV-Dateien für BigQuery" unterstützt nicht den Import von Daten in Felder des Typs STRUCT (Eintrag) in der BigQuery-Zieltabelle.

Der folgende JSON-Code beschreibt ein BigQuery-Beispielschema:

  "BigQuery Schema": [
      "name": "location",
      "type": "STRING"
      "name": "name",
      "type": "STRING"
      "name": "age",
      "type": "STRING"
      "name": "color",
      "type": "STRING"
      "name": "coffee",
      "type": "STRING"


Die BigQuery-Tabelle, in der die abgelehnten Datensätze aus CSV-Dateien gespeichert werden, muss mit dem hier definierten Tabellenschema übereinstimmen.

  "BigQuery Schema": [
      "name": "RawContent",
      "type": "STRING"
      "name": "ErrorMsg",
      "type": "STRING"


Erforderliche Parameter

  • inputFilePattern: Der Cloud Storage-Pfad zur CSV-Datei mit dem zu verarbeitenden Text. Beispiel: gs://your-bucket/path/*.csv.
  • schemaJSONPath: Der Cloud Storage-Pfad zur JSON-Datei, die Ihr BigQuery-Schema definiert.
  • outputTable: Der Name der BigQuery-Tabelle, in der die verarbeiteten Daten gespeichert werden. Wenn Sie eine vorhandene BigQuery-Tabelle wiederverwenden, werden die Daten an die Zieltabelle angehängt.
  • bigQueryLoadingTemporaryDirectory: Das temporäre Verzeichnis, das während des BigQuery-Ladevorgangs verwendet werden soll. Beispiel: gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: Der Name der BigQuery-Tabelle, in der die abgelehnten Daten bei der Verarbeitung der CSV-Dateien gespeichert werden sollen. Wenn Sie eine vorhandene BigQuery-Tabelle wiederverwenden, werden die Daten an die Zieltabelle angehängt. Das Schema dieser Tabelle muss mit dem Fehlertabellenschema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema) übereinstimmen.
  • delimiter: Das Spaltentrennzeichen, das in der CSV-Datei verwendet wird. Beispiel: ,.
  • csvFormat: Das CSV-Format gemäß dem Apache Commons CSV-Format. Die Standardeinstellung ist Default.

Optionale Parameter

  • containsHeaders: Gibt an, ob Header in der CSV-Datei enthalten sind. Die Standardeinstellung ist false.
  • csvFileEncoding: Das Zeichencodierungsformat der CSV-Datei. Zulässige Werte sind US-ASCII, ISO-8859-1, UTF-8 und UTF-16. Standardmäßig ist dies auf UTF8 eingestellt.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the CSV files on Cloud Storage to BigQuery (Batch) templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PATH_TO_CSV_DATA: der Cloud Storage-Pfad zu Ihren CSV-Dateien
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • BIGQUERY_DESTINATION_TABLE: Name der BigQuery-Zieltabelle
  • BIGQUERY_BAD_RECORDS_TABLE: der Name der BigQuery-Tabelle mit den fehlerhaften Datensätzen
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
  • DELIMITER: Trennzeichen der CSV-Datei
  • CSV_FORMAT: CSV-Formatspezifikation zum Parsen von Einträgen
  • CONTAINS_HEADERS: ob die CSV-Dateien Header enthalten
  • CSV_FILE_ENCODING: Codierung in den CSV-Dateien

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
   "jobName": "JOB_NAME",
   "parameters": {
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   "environment": { "zone": "us-central1-f" }

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PATH_TO_CSV_DATA: der Cloud Storage-Pfad zu Ihren CSV-Dateien
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • BIGQUERY_DESTINATION_TABLE: Name der BigQuery-Zieltabelle
  • BIGQUERY_BAD_RECORDS_TABLE: der Name der BigQuery-Tabelle mit den fehlerhaften Datensätzen
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
  • DELIMITER: Trennzeichen der CSV-Datei
  • CSV_FORMAT: CSV-Formatspezifikation zum Parsen von Einträgen
  • CONTAINS_HEADERS: ob die CSV-Dateien Header enthalten
  • CSV_FILE_ENCODING: Codierung in den CSV-Dateien
package com.google.cloud.teleport.templates;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.CSVToBigQuery.Options;
import com.google.cloud.teleport.templates.common.CsvConverters;
import com.google.cloud.teleport.templates.common.CsvConverters.CsvPipelineOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_CSV_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
    name = "GCS_CSV_to_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "CSV Files on Cloud Storage to BigQuery",
    description =
        "The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
            + "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.",
    optionsClass = Options.class,
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
          + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
          + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
          + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
          + "<pre class=\"prettyprint lang-json\">\n"
          + "{\n"
          + "  \"BigQuery Schema\": [\n"
          + "    {\n"
          + "      \"name\": \"location\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"name\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"age\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"color\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"coffee\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    }\n"
          + "  ]\n"
          + "}\n"
public class CSVToBigQuery {

  /** Options supported by {@link CSVToBigQuery}. */
  public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {

        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
        regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
        example = "gs://your-bucket/path/*.csv")
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

        order = 2,
        groupName = "Target",
        description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
        helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
    ValueProvider<String> getSchemaJSONPath();

    void setSchemaJSONPath(ValueProvider<String> value);

        order = 3,
        groupName = "Target",
        description = "BigQuery output table",
        helpText =
            "The name of the BigQuery table that stores your processed data. If you reuse an existing "
                + "BigQuery table, the data is appended to the destination table.")
    ValueProvider<String> getOutputTable();

    void setOutputTable(ValueProvider<String> value);

        order = 4,
        description = "Temporary directory for BigQuery loading process",
        helpText = "The temporary directory to use during the BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp_dir")
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);

        order = 5,
        description = "BigQuery output table for bad records",
        helpText =
            "The name of the BigQuery table to use to store the rejected data when processing the"
                + " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
                + " destination table. The schema of this table must match the"
                + " error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).")
    ValueProvider<String> getBadRecordsOutputTable();

    void setBadRecordsOutputTable(ValueProvider<String> value);

  private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  /** The tag for the headers of the CSV if required. */
  private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the line of the CSV that matches destination table schema. */
  private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};

  /** The tag for the lines of the CSV that does not match destination table schema. */
  private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};

  /** The schema of the BigQuery table for the bad records. */
  private static final TableSchema errorTableSchema =
      new TableSchema()
                  new TableFieldSchema().setName("RawContent").setType("STRING"),
                  new TableFieldSchema().setName("ErrorMsg").setType("STRING")));

  private static class StringToTableRowFn extends DoFn<String, TableRow> {
    private final ValueProvider<String> delimiter;
    private final NestedValueProvider<List<String>, String> fields;

    public StringToTableRowFn(
        NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
      this.delimiter = delimiter;
      this.fields = schemaFields;

    public void processElement(ProcessContext context) throws IllegalArgumentException {
      TableRow outputTableRow = new TableRow();
      String[] rowValue =
          Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
      if (rowValue.length != fields.get().size()) {
        LOG.error("Number of fields in the schema and number of Csv headers do not match.");
        outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
            "ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
        context.output(BAD_RECORDS, outputTableRow);
      } else {
        for (int i = 0; i < fields.get().size(); ++i) {
          outputTableRow.set(fields.get().get(i), rowValue[i]);
        context.output(GOOD_RECORDS, outputTableRow);

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    PCollectionTuple tableRows =
                        new StringToTableRowFn(
                                jsonPath -> {
                                  List<String> fields = new ArrayList<>();
                                  SchemaParser schemaParser = new SchemaParser();

                                  try {
                                    JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
                                    JSONArray bqSchemaJsonArray =

                                    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                                      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

                                  } catch (Exception e) {
                                    throw new RuntimeException(
                                        "Error parsing schema " + jsonPath, e);
                                  return fields;
                    .withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));

            "Insert good records into Bigquery",
                        schemaPath -> {
                          TableSchema tableSchema = new TableSchema();
                          List<TableFieldSchema> fields = new ArrayList<>();
                          SchemaParser schemaParser = new SchemaParser();

                          try {
                            JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
                            JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                            for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                              JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

                          } catch (Exception e) {
                            throw new RuntimeException("Error parsing schema " + schemaPath, e);
                          return tableSchema;

            "Insert bad records into Bigquery",


   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * the conversion recursively.
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()

    if (inputField.has(MODE)) {

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);

    return field;

