Cloud Storage CSV files to BigQuery 模板

Cloud Storage CSV files to BigQuery 流水线是一种批处理流水线,可让您从 Cloud Storage 中存储的 CSV 文件读取数据,并将结果附加到 BigQuery 表。CSV 文件可以未经压缩,也可以采用 Compression 枚举 SDK 页面中列出的格式进行压缩。

流水线要求

如需使用此模板,流水线必须满足以下要求。

BigQuery 架构 JSON 文件

创建一个用于描述 BigQuery 架构的 JSON 文件。 确保架构包含一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。

Cloud Storage CSV files to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的 STRUCT(记录)字段。

下面的 JSON 描述了一个 BigQuery 架构示例:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

错误表架构

存储 CSV 文件中被拒绝记录的 BigQuery 表必须与此处定义的表架构匹配。

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

模板参数

必需参数

  • inputFilePattern:包含要处理的文本的 CSV 文件的 Cloud Storage 路径。例如 gs://your-bucket/path/*.csv
  • schemaJSONPath:定义 BigQuery 架构的 JSON 文件的 Cloud Storage 路径。
  • outputTable:存储已处理数据的 BigQuery 表的名称。如果您重复使用现有 BigQuery 表,则数据会被附加到目标表。
  • bigQueryLoadingTemporaryDirectory:在 BigQuery 加载过程中使用的临时目录。例如 gs://your-bucket/your-files/temp_dir
  • badRecordsOutputTable:处理 CSV 文件时用于存储被拒绝数据的 BigQuery 表的名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。 此表的架构必须与错误表架构匹配 (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema)。
  • delimiter:CSV 文件使用的列分隔符。例如 ,
  • csvFormat:基于 Apache Commons CSV 格式的 CSV 格式。默认值为 Default

可选参数

  • containsHeaders:CSV 文件中是否包含标题。默认值为 false
  • csvFileEncoding:CSV 文件的字符编码格式。允许的值包括 US-ASCIIISO-8859-1UTF-8UTF-16。默认为 UTF-8。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the CSV files on Cloud Storage to BigQuery (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • PATH_TO_CSV_DATA:CSV 文件的 Cloud Storage 路径
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • BIGQUERY_DESTINATION_TABLE:BigQuery 目标表的名称
  • BIGQUERY_BAD_RECORDS_TABLE:BigQuery 有误记录表的名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • DELIMITER:CSV 文件分隔符
  • CSV_FORMAT:用于解析记录的 CSV 格式规范
  • CONTAINS_HEADERS:CSV 文件是否包含标题
  • CSV_FILE_ENCODING:CSV 文件中的编码

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • PATH_TO_CSV_DATA:CSV 文件的 Cloud Storage 路径
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • BIGQUERY_DESTINATION_TABLE:BigQuery 目标表的名称
  • BIGQUERY_BAD_RECORDS_TABLE:BigQuery 有误记录表的名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • DELIMITER:CSV 文件分隔符
  • CSV_FORMAT:用于解析记录的 CSV 格式规范
  • CONTAINS_HEADERS:CSV 文件是否包含标题
  • CSV_FILE_ENCODING:CSV 文件中的编码

后续步骤