La pipeline dei file CSV di Cloud Storage in BigQuery è una pipeline batch che consente di leggere i dati dai file CSV archiviati in Cloud Storage e di accodare il risultato a una tabella BigQuery. I file CSV possono essere non compressi o compressi nei formati elencati nella pagina SDK Enum Compression.

Requisiti della pipeline

Per utilizzare questo modello, la pipeline deve soddisfare i seguenti requisiti.

File JSON dello schema BigQuery

Crea un file JSON che descriva lo schema BigQuery. Assicurati che lo schema abbia un array JSON di primo livello denominato BigQuery Schema e che i suoi contenuti rispettino il pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Il modello batch per i file CSV di Cloud Storage in BigQuery non supporta l'importazione dei dati nei campi STRUCT (Record) della tabella BigQuery di destinazione.

Il seguente JSON descrive uno schema BigQuery di esempio:

  "BigQuery Schema": [
      "name": "location",
      "type": "STRING"
      "name": "name",
      "type": "STRING"
      "name": "age",
      "type": "STRING"
      "name": "color",
      "type": "STRING"
      "name": "coffee",
      "type": "STRING"

Schema della tabella degli errori

La tabella BigQuery che memorizza i record rifiutati dai file CSV deve corrispondere allo schema della tabella definito qui.

  "BigQuery Schema": [
      "name": "RawContent",
      "type": "STRING"
      "name": "ErrorMsg",
      "type": "STRING"

Parametri del modello

Parametri obbligatori

  • patternFileInput: il percorso Cloud Storage del file CSV contenente il testo da elaborare. Ad esempio, gs://your-bucket/path/*.csv.
  • schemaJSONPath: il percorso in Cloud Storage del file JSON che definisce lo schema BigQuery.
  • outputTable: il nome della tabella BigQuery che memorizza i dati elaborati. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione.
  • bigQueryLoadingTemporaryDirectory: la directory temporanea da utilizzare durante il processo di caricamento di BigQuery. Ad esempio, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: il nome della tabella BigQuery da utilizzare per archiviare i dati rifiutati durante l'elaborazione dei file CSV. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione. Lo schema di questa tabella deve corrispondere a quello della tabella degli errori (
  • delimiter: il delimitatore di colonna utilizzato dal file CSV. Ad esempio, ,.
  • csvFormat: il formato CSV secondo il formato CSV di Apache Commons. Il valore predefinito è Default.

Parametri facoltativi

  • containsHeaders: indica se le intestazioni sono incluse nel file CSV. Il valore predefinito è false.
  • csvFileEncoding: il formato di codifica dei caratteri del file CSV. I valori consentiti sono US-ASCII, ISO-8859-1, UTF-8 e UTF-16. Il valore predefinito è UTF-8.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella dei record errati di BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea
  • DELIMITER: delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "jobName": "JOB_NAME",
   "parameters": {
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   "environment": { "zone": "us-central1-f" }

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella dei record errati di BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea
  • DELIMITER: delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * Templated pipeline to read CSV files from Cloud Storage, and write it to BigQuery.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
    name = "GCS_CSV_to_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "CSV Files on Cloud Storage to BigQuery",
    description =
        "The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in "
            + "Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in",
    optionsClass = Options.class,
    contactInformation = "",
    requirements = {
      "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
          + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
          + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
          + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
          + "<pre class=\"prettyprint lang-json\">\n"
          + "{\n"
          + "  \"BigQuery Schema\": [\n"
          + "    {\n"
          + "      \"name\": \"location\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"name\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"age\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"color\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    },\n"
          + "    {\n"
          + "      \"name\": \"coffee\",\n"
          + "      \"type\": \"STRING\"\n"
          + "    }\n"
          + "  ]\n"
          + "}\n"
public class CSVToBigQuery {

  /** Options supported by {@link CSVToBigQuery}. */
  public interface Options extends DataflowPipelineOptions, CsvPipelineOptions {

        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The Cloud Storage path to the CSV file that contains the text to process.",
        regexes = {"^gs:\\/\\/[^\\n\\r]+$"},
        example = "gs://your-bucket/path/*.csv")
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

        order = 2,
        groupName = "Target",
        description = "Cloud Storage location of your BigQuery schema file, described as a JSON",
        helpText = "The Cloud Storage path to the JSON file that defines your BigQuery schema.")
    ValueProvider<String> getSchemaJSONPath();

    void setSchemaJSONPath(ValueProvider<String> value);

        order = 3,
        groupName = "Target",
        description = "BigQuery output table",
        helpText =
            "The name of the BigQuery table that stores your processed data. If you reuse an existing "
                + "BigQuery table, the data is appended to the destination table.")
    ValueProvider<String> getOutputTable();

    void setOutputTable(ValueProvider<String> value);

        order = 4,
        description = "Temporary directory for BigQuery loading process",
        helpText = "The temporary directory to use during the BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp_dir")
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);

        order = 5,
        description = "BigQuery output table for bad records",
        helpText =
            "The name of the BigQuery table to use to store the rejected data when processing the"
                + " CSV files. If you reuse an existing BigQuery table, the data is appended to the"
                + " destination table. The schema of this table must match the"
                + " error table schema (")
    ValueProvider<String> getBadRecordsOutputTable();

    void setBadRecordsOutputTable(ValueProvider<String> value);

  private static final Logger LOG = LoggerFactory.getLogger(CSVToBigQuery.class);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  /** The tag for the headers of the CSV if required. */
  private static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  private static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the line of the CSV that matches destination table schema. */
  private static final TupleTag<TableRow> GOOD_RECORDS = new TupleTag<TableRow>() {};

  /** The tag for the lines of the CSV that does not match destination table schema. */
  private static final TupleTag<TableRow> BAD_RECORDS = new TupleTag<TableRow>() {};

  /** The schema of the BigQuery table for the bad records. */
  private static final TableSchema errorTableSchema =
      new TableSchema()
                  new TableFieldSchema().setName("RawContent").setType("STRING"),
                  new TableFieldSchema().setName("ErrorMsg").setType("STRING")));

  private static class StringToTableRowFn extends DoFn<String, TableRow> {
    private final ValueProvider<String> delimiter;
    private final NestedValueProvider<List<String>, String> fields;

    public StringToTableRowFn(
        NestedValueProvider<List<String>, String> schemaFields, ValueProvider<String> delimiter) {
      this.delimiter = delimiter;
      this.fields = schemaFields;

    public void processElement(ProcessContext context) throws IllegalArgumentException {
      TableRow outputTableRow = new TableRow();
      String[] rowValue =
          Splitter.on(delimiter.get()).splitToList(context.element()).toArray(new String[0]);
      if (rowValue.length != fields.get().size()) {
        LOG.error("Number of fields in the schema and number of Csv headers do not match.");
        outputTableRow.set("RawContent", String.join(delimiter.get(), rowValue));
            "ErrorMsg", "Number of fields in the schema and number of Csv headers do not match.");
        context.output(BAD_RECORDS, outputTableRow);
      } else {
        for (int i = 0; i < fields.get().size(); ++i) {
          outputTableRow.set(fields.get().get(i), rowValue[i]);
        context.output(GOOD_RECORDS, outputTableRow);

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    PCollectionTuple tableRows =
                        new StringToTableRowFn(
                                jsonPath -> {
                                  List<String> fields = new ArrayList<>();
                                  SchemaParser schemaParser = new SchemaParser();

                                  try {
                                    JSONObject jsonSchema = schemaParser.parseSchema(jsonPath);
                                    JSONArray bqSchemaJsonArray =

                                    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                                      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

                                  } catch (Exception e) {
                                    throw new RuntimeException(
                                        "Error parsing schema " + jsonPath, e);
                                  return fields;
                    .withOutputTags(GOOD_RECORDS, TupleTagList.of(BAD_RECORDS)));

            "Insert good records into Bigquery",
                        schemaPath -> {
                          TableSchema tableSchema = new TableSchema();
                          List<TableFieldSchema> fields = new ArrayList<>();
                          SchemaParser schemaParser = new SchemaParser();

                          try {
                            JSONObject jsonSchema = schemaParser.parseSchema(schemaPath);
                            JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

                            for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
                              JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

                          } catch (Exception e) {
                            throw new RuntimeException("Error parsing schema " + schemaPath, e);
                          return tableSchema;

            "Insert bad records into Bigquery",

   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * the conversion recursively.
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()

    if (inputField.has(MODE)) {

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);

    return field;

