Cloud Storage CSV files to BigQuery template

The Cloud Storage CSV files to BigQuery pipeline is a batch pipeline that lets you read data from CSV files stored in Cloud Storage and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in the Compression Enum SDK page.

Pipeline requirements

To use this template, your pipeline must meet the following requirements.

BigQuery schema JSON file

Create a JSON file that describes your BigQuery schema. Ensure that the schema has a top-level JSON array titled BigQuery Schema and that its contents follow the pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

The Cloud Storage CSV files to BigQuery batch template doesn't support importing data into STRUCT (Record) fields in the target BigQuery table.

The following JSON describes an example BigQuery schema:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Error table schema

The BigQuery table that stores the rejected records from CSV files must match the table schema defined here.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Template parameters

Parameter Description
inputFilePattern The Cloud Storage path to the CSV file that contains the text to process. For example, gs://path/to/my/text/data.csv.
schemaJSONPath The Cloud Storage path to the JSON file that defines your BigQuery schema. For example, gs://path/to/my/schema.json.
outputTable The name of the BigQuery table that stores your processed data. If you reuse an existing BigQuery table, the data is appended to the destination table. For example, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory The temporary directory to use during the BigQuery loading process. For example, gs://my-bucket/my-files/temp_dir.
badRecordsOutputTable The name of the BigQuery table to use to store the rejected data when processing the CSV files. If you reuse an existing BigQuery table, the data is appended to the destination table. For example, my-project-name:my-dataset.my-bad-records-table. The schema of this table must match the error table schema.
delimiter The column delimiter of the input CSV files. For example, ",".
csvFormat The CSV format specification to use for parsing records. For example, Default. This value must exactly match the format names in the CSVFormat.Predefined Enum.
containsHeaders Whether the input CSV files contain a header record. Defaults to false.
csvFileEncoding The CSV file character encoding format. Allowed values are US-ASCII, ISO-8859-1, UTF-8, and UTF-16. Defaults to UTF-8.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • PATH_TO_CSV_DATA: the Cloud Storage path to your CSV files
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • BIGQUERY_DESTINATION_TABLE: the BigQuery destination table name
  • BIGQUERY_BAD_RECORDS_TABLE: the BigQuery bad records table name
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory
  • DELIMITER: CSV file delimiter
  • CSV_FORMAT: CSV format specification for parsing records
  • CONTAINS_HEADERS: whether the CSV files contains headers
  • CSV_FILE_ENCODING: encoding in the CSV files

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • PATH_TO_CSV_DATA: the Cloud Storage path to your CSV files
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • BIGQUERY_DESTINATION_TABLE: the BigQuery destination table name
  • BIGQUERY_BAD_RECORDS_TABLE: the BigQuery bad records table name
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory
  • DELIMITER: CSV file delimiter
  • CSV_FORMAT: CSV format specification for parsing records
  • CONTAINS_HEADERS: whether the CSV files contains headers
  • CSV_FILE_ENCODING: encoding in the CSV files

What's next