从 Cloud Storage 加载 CSV 数据

从 Cloud Storage 加载 CSV 文件

从 Cloud Storage 加载 CSV 数据时,可以将数据加载到新的表或分区中,也可以将数据附加到现有的表或分区或覆盖现有的表或分区。当数据加载到 BigQuery 时,会被转化为适用于 Capacitor 的列式格式(BigQuery 的存储格式)。

如需将 Cloud Storage 中的数据加载到 BigQuery 表,则包含该表的数据集必须与相应 Cloud Storage 存储分区位于同一区域或多区域位置。

如需了解如何从本地文件加载 CSV 数据,请参阅将本地数据源中的数据加载到 BigQuery 中

限制

如果要将 Cloud Storage 中的 CSV 数据加载到 BigQuery,请注意以下事项:

  • CSV 文件不支持嵌套或重复数据。
  • 如果使用 gzip 压缩,BigQuery 将无法并行读取数据。与加载未压缩数据相比,将压缩的 CSV 数据加载到 BigQuery 的速度较为缓慢。
  • 您无法在同一个加载作业中同时包含压缩文件和非压缩文件。
  • 加载 CSV 或 JSON 数据时,DATE 列的值必须使用英文短划线 (-) 分隔符,并且日期必须采用以下格式:YYYY-MM-DD(年-月-日)。
  • 加载 JSON 或 CSV 数据时,TIMESTAMP 列的值必须使用英文短划线 (-) 分隔符来分隔时间戳的日期部分,并且日期必须采用以下格式:YYYY-MM-DD(年-月-日)。时间戳的 hh:mm:ss(时-分-秒)部分必须使用英文冒号 (:) 分隔符。

所需权限

在将数据加载到 BigQuery 时,您需要拥有相关权限,才能运行加载作业并将数据加载到新的或现有的 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有对您的数据所在的存储分区的访问权限。

BigQuery 权限

至少需具备以下权限,才能将数据加载到 BigQuery。无论您是要将数据加载到新的表或分区,还是要附加到或覆盖现有的表或分区,都必须具备这些权限。

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.jobs.create

以下预定义的 IAM 角色同时具有 bigquery.tables.createbigquery.tables.updateData 权限:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

以下预定义的 IAM 角色包含 bigquery.jobs.create 权限:

  • bigquery.user
  • bigquery.jobUser
  • bigquery.admin

此外,如果用户具有 bigquery.datasets.create 权限,则当该用户创建数据集时,系统会为其授予该数据集的 bigquery.dataOwner 访问权限。借助 bigquery.dataOwner 访问权限,用户可以通过使用加载作业在数据集中创建和更新表。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅访问控制

Cloud Storage 权限

如需从 Cloud Storage 存储分区中加载数据,您必须获得 storage.objects.get 权限。如果要使用 URI 通配符,您还必须具有 storage.objects.list 权限。

授予预定义的 IAM 角色 storage.objectViewer,即可同时提供 storage.objects.getstorage.objects.list 权限。

将 CSV 数据加载到表中

您可以通过以下方式将 CSV 数据从 Cloud Storage 加载到新的 BigQuery 表中:

  • 使用 Cloud Console 或经典版网页界面
  • 使用 bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 使用客户端库

如需将 CSV 数据从 Cloud Storage 加载到新的 BigQuery 表中,请执行以下操作:

控制台

  1. 在 Cloud Console 中打开 BigQuery 网页界面。
    转到 Cloud Console

  2. 在导航面板的资源部分中,展开您的项目并选择数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表。加载数据的过程与创建空表的过程相同。

    创建表

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Cloud Storage。

    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意,Cloud Console 不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      选择文件

    • 文件格式部分,选择 CSV

  5. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。

      查看数据集

    • 确认表类型设置为原生表

    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。

  6. 架构部分的自动检测下,勾选架构和输入参数,以启用架构自动检测功能。或者,您可以通过以下方式手动输入架构定义:

    • 启用以文本形式修改,并以 JSON 数组格式输入表架构。

      以 JSON 数组格式添加架构

    • 使用添加字段手动输入架构。

      使用“添加字段”按钮添加架构定义

  7. (可选)如需对表进行分区,请在分区和聚簇设置中选择相应选项:

    • 如需创建分区表,请点击不进行分区,选择按字段分区 (Partition by field),然后选择 DATETIMESTAMP 列。如果架构不包含 DATETIMESTAMP 列,则此选项不可用。
    • 如需创建提取时间分区表,请点击不进行分区,然后选择按提取时间分区
  8. (可选)在分区过滤条件中,点击需要分区过滤条件框,以要求用户添加 WHERE 子句来指定要查询的分区。要求分区过滤条件有可能减少费用并提高性能。如需了解详情,请参阅查询分区表。如果已选择不进行分区,则此选项不可用。

  9. (可选)如需对该表进行聚簇,请在聚簇顺序框中,输入一到四个字段名称。

  10. (可选)点击高级选项

    • 写入偏好设置部分,选中只写入空白表。此选项创建一个新表并向其中加载数据。
    • 允许的错误数部分中,接受默认值 0 或输入可忽略的含错行数上限。如果含错行数超过此值,该作业将生成 invalid 消息并失败。
    • 未知值部分,勾选忽略未知值以忽略未出现在表架构中的行内的所有值。
    • 字段分隔符部分,选择用于分隔 CSV 文件中的单元格的字符:英文逗号制表符竖线符自定义分隔符。如果您选择自定义,请在自定义字段分隔符框中输入分隔符。默认值为英文逗号
    • Header rows to skip 部分,输入在 CSV 文件顶部要跳过的标题行数。默认值为 0
    • 引用的数据中包含换行符 (Quoted newlines) 部分,勾选允许引用的数据中包含换行符以允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
    • 可选列留空的行 (Jagged rows) 部分,勾选允许使用可选列留空的行以接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密
  11. 点击创建表

经典版界面

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板中,将鼠标悬停在数据集上,点击向下箭头图标 向下箭头图标图片,然后点击 Create new table。加载数据的过程与创建空表的过程相同。

  3. Create Table 页面的 Source Data 部分,执行以下操作:

    • 点击 Create from source
    • Location 部分选择 Cloud Storage,然后在来源字段中输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。
    • File format 部分,选择 CSV
  4. Destination Table 部分,执行以下操作:

    • Table name 部分,选择适当的数据集,然后在表名称字段中输入要在 BigQuery 中创建的表的名称。
    • 确认 Table type 设置为 Native table
  5. Schema 部分的 Auto detect 下,勾选 Schema and input parameters,以启用架构自动检测功能。或者,您可以通过以下方式手动输入架构定义:

    • 点击 Edit as text,并以 JSON 数组格式输入表架构。

      以 JSON 数组格式添加架构

    • 使用 Add Field 手动输入架构:

      使用添加字段添加架构

  6. (可选)在 Options 部分,执行以下操作:

    • Field delimiter 部分,选择用于分隔 CSV 文件中的单元格的字符:CommaTabPipeOther。如果您选择 Other,请在 Custom field delimiter 框中输入分隔符。默认值为 Comma
    • Header rows to skip 部分,输入在 CSV 文件顶部要跳过的标题行数。默认值为 0
    • Number of errors allowed 部分中,接受默认值 0 或输入可忽略的含错行数上限。如果含错行数超过此值,该作业将生成 invalid 消息并失败。
    • Allow quoted newlines 部分,勾选方框以允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
    • Allow jagged rows 部分,勾选方框以接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
    • Ignore unknown values 部分,勾选方框以忽略未出现在表架构中的行内的所有值。
    • Write preference 部分,选中 Write if empty。此选项创建一个新表并向其中加载数据。
    • 如需对表进行分区,请执行以下操作:
      • 对于 Partitioning Type,点击 None 并选择 Day
      • 对于 Partitioning Field
      • 如需创建分区表,请选择 DATETIMESTAMP 列。如果架构不包含 DATETIMESTAMP 列,则此选项不可用。
      • 如需创建提取时间分区表,请保留默认值:_PARTITIONTIME
      • 点击 Require partition filter 框,以要求用户添加 WHERE 子句来指定要查询的分区。要求分区过滤条件有可能减少费用并提高性能。 如需了解详情,请参阅查询分区表。如果 Partitioning type 设置为 None,则此选项不可用。
    • 如需对该表进行聚簇,请在 Clustering fields 框中,输入一到四个字段名称。
    • Destination encryption 部分中,选择 Customer-managed encryption,以使用 Cloud Key Management Service 密钥来加密表。如果保留 Default 设置,BigQuery 将使用 Google 管理的密钥对静态数据进行加密
  7. 点击创建表

bq

使用 bq load 命令,通过 --source_format 标志指定 CSV,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。在架构定义文件中以内嵌形式提供架构,或者使用架构自动检测功能。

(可选)添加 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --allow_jagged_rows:指定此标志时,系统会接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
  • --allow_quoted_newlines:指定此标志时,系统会允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
  • --field_delimiter:此标志表示的是指明数据中各列之间边界的字符。可以使用 \ttab 作为制表符分隔符。默认值为 ,
  • --null_marker:此标志表示一个可选的自定义字符串,该字符串代表 CSV 数据中的一个 NULL 值。
  • --skip_leading_rows:指定在 CSV 文件顶部要跳过的标题行数。默认值为 0
  • --quote:此标志表示用于括起记录的引号字符。默认值为 "。如需表示无引号字符,请使用空字符串。
  • --max_bad_records:此标志表示一个整数,指定了作业中允许的最大错误记录数量,超过此数量之后,整个作业就会失败。默认值为 0。无论 --max_bad_records 值设为多少,系统最多只会返回 5 个任意类型的错误。
  • --ignore_unknown_values:如果指定此标志,系统会允许并忽略 CSV 或 JSON 数据中无法识别的额外值。
  • --autodetect:指定此标志时,系统会为 CSV 和 JSON 数据启用架构自动检测功能。
  • --time_partitioning_type:此标志会在表上启用基于时间的分区,并设置分区类型。目前,唯一可以使用的值为 DAY,即每天生成一个分区。当您按 DATETIMESTAMP 列创建分区表时,可选用此标志。
  • --time_partitioning_expiration:此标志值为一个整数,指定了应在何时删除基于时间的分区(以秒为单位)。过期时间按分区的世界协调时间 (UTC) 日期加上这个整数值计算得出。
  • --time_partitioning_field:此标志表示用于创建分区表DATETIMESTAMP 列。如果在未提供此值的情况下启用了基于时间的分区,系统会创建提取时间分区表
  • --require_partition_filter:启用后,此选项会要求用户添加 WHERE 子句来指定要查询的分区。要求分区过滤条件有可能减少费用并提高性能。 如需了解详情,请参阅查询分区表
  • --clustering_fields:此标志表示以英文逗号分隔的列名称列表(最多包含 4 个列名称),用于创建聚簇表
  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。

    如需详细了解分区表,请参阅:

    如需详细了解聚簇表,请参阅:

    如需详细了解表加密,请参阅以下部分:

如需将 CSV 数据加载到 BigQuery,请输入以下命令:

bq --location=location load \
--source_format=format \
dataset.table \
path_to_source \
schema

其中:

  • location 是您所在的位置。--location 是可选标志。例如,如果您在东京地区使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatCSV
  • dataset 是现有数据集。
  • table 是要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符
  • schema 是有效架构。该架构可以是本地 JSON 文件,也可以在命令中以内嵌形式输入架构。您还可以改用 --autodetect 标志,而无需提供架构定义。

示例:

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是在名为 myschema.json 的本地架构文件中定义的。

    bq load \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    ./myschema.json

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是在名为 myschema.json 的本地架构文件中定义的。该 CSV 文件包含两个标题行。 如果未指定 --skip_leading_rows,则默认行为是假设文件不包含标题。

    bq load \
    --source_format=CSV \
    --skip_leading_rows=2
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    ./myschema.json

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的提取时间分区表中。架构是在名为 myschema.json 的本地架构文件中定义的。

    bq load \
    --source_format=CSV \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    ./myschema.json

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的分区表中。该表按 mytimestamp 列进行分区。架构是在名为 myschema.json 的本地架构文件中定义的。

    bq load \
    --source_format=CSV \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    ./myschema.json

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是自动检测的。

    bq load \
    --autodetect \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv

以下命令将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构以内嵌形式定义,格式为:field:data_type,field:data_type

    bq load \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    qtr:STRING,sales:FLOAT,year:STRING

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。Cloud Storage URI 使用通配符。架构是自动检测的。

    bq load \
    --autodetect \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata*.csv

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。该命令包含以英文逗号分隔的 Cloud Storage URI 列表(含通配符)。架构是在名为 myschema.json 的本地架构文件中定义的。

    bq load \
    --source_format=CSV \
    mydataset.mytable \
    "gs://mybucket/00/*.csv","gs://mybucket/01/*.csv" \
    ./myschema.json

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://bucket/object。每个 URI 只可包含一个“*”通配符

  4. sourceFormat 属性设置为 CSV,以指定 CSV 数据格式。

  5. 如需检查作业状态,请调用 jobs.get(job_id*),其中 job_id 是初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果出现 status.errorResult 属性,则表示请求失败,并且该对象将包含描述问题的相关信息。如果请求失败,则不创建任何表且不加载任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误会列在返回的作业对象的 status.errors 属性中。

API 说明

  • 加载作业兼具原子性和一致性。也就是说,如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据全部可用。

  • 通过调用 jobs.insert 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法受网络故障影响较小,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert 具有幂等性。您可以对同一作业 ID 进行任意次重试,但最多只会有一个成功操作。

C#

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryLoadTableGcsCsv
{
    public void LoadTableGcsCsv(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        var gcsURI = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
        var dataset = client.GetDataset(datasetId);
        var schema = new TableSchemaBuilder {
            { "name", BigQueryDbType.String },
            { "post_abbr", BigQueryDbType.String }
        }.Build();
        var destinationTableRef = dataset.GetTableReference(
            tableId: "us_states");
        // Create job configuration
        var jobOptions = new CreateLoadJobOptions()
        {
            // The source format defaults to CSV; line below is optional.
            SourceFormat = FileFormat.Csv,
            SkipLeadingRows = 1
        };
        // Create and run job
        var loadJob = client.CreateLoadJob(
            sourceUri: gcsURI, destination: destinationTableRef,
            schema: schema, options: jobOptions);
        loadJob.PollUntilCompleted();  // Waits for the job to complete.
        // Display the number of rows uploaded
        BigQueryTable table = client.GetTable(destinationTableRef);
        Console.WriteLine(
            $"Loaded {table.Resource.NumRows} rows to {table.FullyQualifiedId}");
    }
}

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importCSVExplicitSchema demonstrates loading CSV data from Cloud Storage into a BigQuery
// table and providing an explicit schema for the data.
func importCSVExplicitSchema(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.csv")
	gcsRef.SkipLeadingRows = 1
	gcsRef.Schema = bigquery.Schema{
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "post_abbr", Type: bigquery.StringFieldType},
	}
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteEmpty

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;

// Sample to load CSV data from Cloud Storage into a new BigQuery table
public class LoadCsvFromGcs {

  public static void runLoadCsvFromGcs() throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
    Schema schema =
        Schema.of(
            Field.of("name", StandardSQLTypeName.STRING),
            Field.of("post_abbr", StandardSQLTypeName.STRING));
    loadCsvFromGcs(datasetName, tableName, sourceUri, schema);
  }

  public static void loadCsvFromGcs(
      String datasetName, String tableName, String sourceUri, Schema schema) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Skip header row in the file.
      CsvOptions csvOptions = CsvOptions.newBuilder().setSkipLeadingRows(1).build();

      TableId tableId = TableId.of(datasetName, tableName);
      LoadJobConfiguration loadConfig =
          LoadJobConfiguration.newBuilder(tableId, sourceUri, csvOptions).setSchema(schema).build();

      // Load data from a GCS CSV file into the table
      Job job = bigquery.create(JobInfo.of(loadConfig));
      // Blocks until this load table job completes its execution, either failing or succeeding.
      job = job.waitFor();
      if (job.isDone()) {
        System.out.println("CSV from GCS successfully added during load append job");
      } else {
        System.out.println(
            "BigQuery was unable to load into the table due to an error:"
                + job.getStatus().getError());
      }
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Column not added during load append \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请查看 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.csv';

async function loadCSVFromGCS() {
  // Imports a GCS file into a table with manually defined schema.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    schema: {
      fields: [
        {name: 'name', type: 'STRING'},
        {name: 'post_abbr', type: 'STRING'},
      ],
    },
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv';
$schema = [
    'fields' => [
        ['name' => 'name', 'type' => 'string'],
        ['name' => 'post_abbr', 'type' => 'string']
    ]
];
$loadConfig = $table->loadFromStorage($gcsUri)->schema($schema)->skipLeadingRows(1);
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

使用 Client.load_table_from_uri() 方法从 Cloud Storage 中的 CSV 文件加载数据。通过将 LoadJobConfig.schema 属性设置为 SchemaField 对象列表,您可以提供明确的架构定义。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

Ruby

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def load_table_gcs_csv dataset_id = "your_dataset_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  gcs_uri  = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
  table_id = "us_states"

  load_job = dataset.load_job table_id, gcs_uri, skip_leading: 1 do |schema|
    schema.string "name"
    schema.string "post_abbr"
  end
  puts "Starting job #{load_job.job_id}"

  load_job.wait_until_done!  # Waits for table load to complete.
  puts "Job finished."

  table = dataset.table(table_id)
  puts "Loaded #{table.rows_count} rows to table #{table.id}"
end

将 CSV 数据加载到使用基于列的时间分区的表中

如需将 Cloud Storage 中的 CSV 数据加载到使用基于列的时间分区的 BigQuery 表中,请使用以下代码:

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档


import (
	"context"
	"fmt"
	"time"

	"cloud.google.com/go/bigquery"
)

// importPartitionedTable demonstrates specifing time partitioning for a BigQuery table when loading
// CSV data from Cloud Storage.
func importPartitionedTable(projectID, destDatasetID, destTableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states-by-date.csv")
	gcsRef.SkipLeadingRows = 1
	gcsRef.Schema = bigquery.Schema{
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "post_abbr", Type: bigquery.StringFieldType},
		{Name: "date", Type: bigquery.DateFieldType},
	}
	loader := client.Dataset(destDatasetID).Table(destTableID).LoaderFrom(gcsRef)
	loader.TimePartitioning = &bigquery.TimePartitioning{
		Field:      "date",
		Expiration: 90 * 24 * time.Hour,
	}
	loader.WriteDisposition = bigquery.WriteEmpty

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.UUID;

public class LoadPartitionedTable {

  public static void runLoadPartitionedTable() throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    String sourceUri = "/path/to/file.csv";
    loadPartitionedTable(datasetName, tableName, sourceUri);
  }

  public static void loadPartitionedTable(String datasetName, String tableName, String sourceUri)
      throws Exception {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(datasetName, tableName);

      Schema schema =
          Schema.of(
              Field.of("name", StandardSQLTypeName.STRING),
              Field.of("post_abbr", StandardSQLTypeName.STRING),
              Field.of("date", StandardSQLTypeName.DATE));

      // Configure time partitioning. For full list of options, see:
      // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning
      TimePartitioning partitioning =
          TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
              .setField("date")
              .setExpirationMs(Duration.of(90, ChronoUnit.DAYS).toMillis())
              .build();

      LoadJobConfiguration loadJobConfig =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.csv())
              .setSchema(schema)
              .setTimePartitioning(partitioning)
              .build();

      // Create a job ID so that we can safely retry.
      JobId jobId = JobId.of(UUID.randomUUID().toString());
      Job loadJob = bigquery.create(JobInfo.newBuilder(loadJobConfig).setJobId(jobId).build());

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = loadJob.waitFor();

      // Check for errors
      if (completedJob == null) {
        throw new Exception("Job not executed since it no longer exists.");
      } else if (completedJob.getStatus().getError() != null) {
        // You can also look at queryJob.getStatus().getExecutionErrors() for all
        // errors, not just the latest one.
        throw new Exception(
            "BigQuery was unable to load into the table due to an error: \n"
                + loadJob.getStatus().getError());
      }
      System.out.println("Data successfully loaded into time partitioned table during load job");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println(
          "Data not loaded into time partitioned table during load job \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请查看 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states-by-date.csv';

async function loadTablePartitioned() {
  // Load data into a table that uses column-based time partitioning.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_new_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const partitionConfig = {
    type: 'DAY',
    expirationMs: '7776000000', // 90 days
    field: 'date',
  };

  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    schema: {
      fields: [
        {name: 'name', type: 'STRING'},
        {name: 'post_abbr', type: 'STRING'},
        {name: 'date', type: 'DATE'},
      ],
    },
    location: 'US',
    timePartitioning: partitionConfig,
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
        bigquery.SchemaField("date", "DATE"),
    ],
    skip_leading_rows=1,
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="date",  # Name of the column to use for partitioning.
        expiration_ms=7776000000,  # 90 days.
    ),
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states-by-date.csv"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Wait for the job to complete.

table = client.get_table(table_id)
print("Loaded {} rows to table {}".format(table.num_rows, table_id))

使用 CSV 数据覆盖或附加到表

您可以通过添加来自源文件的数据或附加查询结果,将其他数据加载到表中。

在控制台或经典版 BigQuery 网页界面中,使用写入偏好设置 (Write preference) 选项指定从源文件或查询结果加载数据时要执行的操作。

将其他数据加载到表中时,可选择以下选项:

Cloud Console 选项 经典版网页界面选项 bq 命令行工具标志 BigQuery API 属性 说明
只写入空白表 Write if empty WRITE_EMPTY 仅当表为空时才写入数据。
附加到表 附加到表 --noreplace--replace=false;如果未指定 --[no]replace,则默认为附加 WRITE_APPEND (默认)在表末尾附加数据。
覆盖表 Overwrite table --replace--replace=true WRITE_TRUNCATE 清空表中所有现有数据然后再写入新数据。

如果将数据加载到现有表中,加载作业可以附加数据或覆盖表。

您可以通过以下方式附加或覆盖表:

  • 使用 Cloud Console 或经典版网页界面
  • 使用 bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 使用客户端库

控制台

  1. 在 Cloud Console 中打开 BigQuery 网页界面。
    转到 Cloud Console

  2. 在导航面板的资源部分中,展开您的项目并选择数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表。在加载作业中附加和覆盖数据的过程与在加载作业中创建表的过程相同。

    创建表

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Cloud Storage。

    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要向其中附加数据或覆盖其数据的表所属的数据集位于同一位置。

      选择文件

    • 文件格式部分,选择 CSV

  5. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。

      选择数据集

    • 表名称字段中,输入您要向其附加数据或覆盖其数据的 BigQuery 表的名称。

    • 确认表类型设置为原生表

  6. 架构部分的自动检测下,勾选架构和输入参数,以启用架构自动检测功能。或者,您可以通过以下方式手动输入架构定义:

    • 启用以文本形式修改,并以 JSON 数组格式输入表架构。

      以 JSON 数组格式添加架构

    • 使用添加字段手动输入架构。

      使用“添加字段”按钮添加架构定义

  7. 对于分区和聚簇设置,保留默认值。您无法通过对表执行附加或覆盖操作将表转换为分区表或聚簇表,并且 Cloud Console 不支持在加载作业中对分区表或聚簇表执行附加或覆盖操作。

  8. 点击高级选项

    • 写入偏好设置部分,选择附加到表覆盖表
    • 允许的错误数部分中,接受默认值 0 或输入可忽略的含错行数上限。如果含错行数超过此值,该作业将生成 invalid 消息并失败。
    • 未知值部分,勾选忽略未知值以忽略未出现在表架构中的行内的所有值。
    • 字段分隔符部分,选择用于分隔 CSV 文件中的单元格的字符:英文逗号制表符竖线符自定义分隔符。如果您选择自定义,请在自定义字段分隔符框中输入分隔符。默认值为英文逗号
    • Header rows to skip 部分,输入在 CSV 文件顶部要跳过的标题行数。默认值为 0
    • 引用的数据中包含换行符 (Quoted newlines) 部分,勾选允许引用的数据中包含换行符以允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
    • 可选列留空的行 (Jagged rows) 部分,勾选允许使用可选列留空的行以接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密

      覆盖表

  9. 点击创建表

经典版界面

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板中,将鼠标悬停在数据集上,点击向下箭头图标 向下箭头图标图片,然后点击 Create new table。在加载作业中附加和覆盖数据的过程与在加载作业中创建表的过程相同。

  3. Create Table 页面的 Source Data 部分,执行以下操作:

    • Location 部分选择 Cloud Storage,然后在来源字段中输入 Cloud Storage URI。请注意,此界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要向其中附加数据或覆盖其数据的表所属的数据集位于同一位置。
    • File format 部分,选择 CSV
  4. Create Table 页面的 Destination Table 部分,执行以下操作:

    • Table name 部分,选择适当的数据集,然后在表名称字段中输入您要向其中附加数据或覆盖其数据的表的名称。
    • 确认 Table type 设置为 Native table
  5. Schema 部分中,输入架构定义。

    • 对于 CSV 文件,您可以勾选 Auto-detect 选项,以启用架构自动检测功能。

      自动检测链接

    • 您也可按照以下方式手动输入架构信息:

      • 点击 Edit as text,并以 JSON 数组格式输入表架构:

        以 JSON 数组格式添加架构

      • 使用添加字段手动输入架构:

        使用添加字段添加架构

  6. Options 部分,执行以下操作:

    • Field delimiter 部分,选择用于分隔 CSV 文件中的单元格的字符:CommaTabPipeOther。如果您选择 Other,请在 Custom field delimiter 框中输入分隔符。默认值为 Comma
    • Header rows to skip 部分,输入在 CSV 文件顶部要跳过的标题行数。默认值为 0
    • Number of errors allowed 部分中,接受默认值 0 或输入可忽略的含错行数上限。如果含错行数超过此值,该作业将生成 invalid 消息并失败。
    • Allow quoted newlines 部分,勾选方框以允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
    • Allow jagged rows 部分,勾选方框以接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
    • Ignore unknown values 部分,勾选方框以忽略未出现在表架构中的行内的所有值。
    • Write preference 部分,选择 Append to tableOverwrite table
    • 保留 Partitioning TypePartitioning FieldRequire partition filterClustering Fields 的默认值。您无法通过附加或覆盖表将表转换为分区表或聚簇表,并且网页界面不支持在加载作业中附加或覆盖分区表或聚簇表。
    • Destination encryption 部分中,选择 Customer-managed encryption,以使用 Cloud Key Management Service 密钥来加密表。如果保留 Default 设置,BigQuery 将使用 Google 管理的密钥对静态数据进行加密
  7. 点击 Create Table

bq

使用 bq load 命令,通过 --source_format 标志指定 CSV,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

在架构定义文件中以内嵌形式提供架构,或者使用架构自动检测功能。

指定 --replace 标志可以覆盖表。使用 --noreplace 标志可向表附加数据。如果未指定标志,则默认附加数据。

您可以在向表执行附加或覆盖操作时修改表的架构。如需详细了解加载操作期间支持的架构更改,请参阅修改表架构

(可选)添加 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --allow_jagged_rows:指定此标志时,系统会接受 CSV 文件中末尾处缺少可选列的行。缺失值被视为 null 值。如果未勾选,则末尾处缺少列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效错误。默认值为 false
  • --allow_quoted_newlines:指定此标志时,系统会允许 CSV 文件中引用的数据部分包含换行符。默认值为 false
  • --field_delimiter:此标志表示的是指明数据中各列之间边界的字符。可以使用 \ttab 作为制表符分隔符。默认值为 ,
  • --null_marker:此标志表示一个可选的自定义字符串,该字符串代表 CSV 数据中的一个 NULL 值。
  • --skip_leading_rows:指定在 CSV 文件顶部要跳过的标题行数。默认值为 0
  • --quote:此标志表示用于括起记录的引号字符。默认值为 "。如需表示无引号字符,请使用空字符串。
  • --max_bad_records:此标志表示一个整数,指定了作业中允许的最大错误记录数量,超过此数量之后,整个作业就会失败。默认值为 0。无论 --max_bad_records 值设为多少,系统最多只会返回 5 个任意类型的错误。
  • --ignore_unknown_values:如果指定此标志,系统会允许并忽略 CSV 或 JSON 数据中无法识别的额外值。
  • --autodetect:如果指定此标志,系统会为 CSV 和 JSON 数据启用架构自动检测功能。
  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。
bq --location=location load \
--[no]replace \
--source_format=format \
dataset.table \
path_to_source \
schema

其中:

  • location 是您所在的位置--location 是可选标志。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatCSV
  • dataset 是现有数据集。
  • table 是要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符
  • schema 是有效架构。该架构可以是本地 JSON 文件,也可以在命令中以内嵌形式输入架构。您还可以改用 --autodetect 标志,而无需提供架构定义。

示例:

以下命令可从 gs://mybucket/mydata.csv 加载数据并覆盖 mydataset 数据集中名为 mytable 的表。架构是使用架构自动检测功能定义的。

    bq load \
    --autodetect \
    --replace \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv

以下命令可从 gs://mybucket/mydata.csv 加载数据,并将数据附加到 mydataset 数据集中名为 mytable 的表。架构是使用 JSON 架构文件 myschema.json 定义的。

    bq load \
    --noreplace \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv \
    ./myschema.json

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://bucket/object。您可以采用英文逗号分隔列表的形式添加多个 URI。请注意,系统也支持通配符

  4. configuration.load.sourceFormat 属性设置为 CSV,以指定数据格式。

  5. configuration.load.writeDisposition 属性设置为 WRITE_TRUNCATEWRITE_APPEND,以指定写入偏好设置。

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importCSVTruncate demonstrates loading data from CSV data in Cloud Storage and overwriting/truncating
// data in the existing table.
func importCSVTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.csv")
	gcsRef.SourceFormat = bigquery.CSV
	gcsRef.AutoDetect = true
	gcsRef.SkipLeadingRows = 1
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;

// Sample to overwrite the BigQuery table data by loading a CSV file from GCS
public class LoadCsvFromGcsTruncate {

  public static void runLoadCsvFromGcsTruncate() throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
    loadCsvFromGcsTruncate(datasetName, tableName, sourceUri);
  }

  public static void loadCsvFromGcsTruncate(String datasetName, String tableName, String sourceUri)
      throws Exception {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(datasetName, tableName);

      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.csv())
              // Set the write disposition to overwrite existing table data
              .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job loadJob = bigquery.create(JobInfo.of(configuration));

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = loadJob.waitFor();

      // Check for errors
      if (completedJob == null) {
        throw new Exception("Job not executed since it no longer exists.");
      } else if (completedJob.getStatus().getError() != null) {
        // You can also look at queryJob.getStatus().getExecutionErrors() for all
        // errors, not just the latest one.
        throw new Exception(
            "BigQuery was unable to load into the table due to an error: \n"
                + loadJob.getStatus().getError());
      }
      System.out.println("Table is successfully overwritten by CSV file loaded from GCS");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Column not added during load append \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请查看 BigQuery Node.js API 参考文档

如需替换现有表中的行,请将 metadata 参数中的 writeDisposition 值设置为 'WRITE_TRUNCATE'

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.csv';

async function loadCSVFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    schema: {
      fields: [
        {name: 'name', type: 'STRING'},
        {name: 'post_abbr', type: 'STRING'},
      ],
    },
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv';
$loadConfig = $table->loadFromStorage($gcsUri)->skipLeadingRows(1)->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

如需替换现有表中的行,请将 LoadJobConfig.write_disposition 属性设置为 SourceFormat 常量 WRITE_TRUNCATE

import six

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = six.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

加载 Hive 分区 CSV 数据

BigQuery 支持加载存储在 Cloud Storage 上的 Hive 分区 CSV 数据,并将 Hive 分区列作为目标 BigQuery 托管表中的列进行填充。如需了解详情,请参阅从 Cloud Storage 加载外部分区数据

CSV 数据加载详情

本部分介绍 BigQuery 如何处理各种 CSV 格式选项。

编码

BigQuery 预期 CSV 数据采用 UTF-8 编码。如果您的 CSV 文件数据采用 ISO-8859-1(也称为 Latin-1)格式编码,则应明确指明编码格式,以便 BigQuery 将其正确转换为 UTF-8 格式。

如果您没有指定编码,或在 CSV 文件不是 UTF-8 编码时指定 UTF-8 编码,则 BigQuery 会尝试将数据转换为 UTF-8 格式。一般来说,您的数据将成功加载,但可能并不是每个字节都符合您的预期。要避免此情况,请使用 --encoding 标志指定正确的编码。

如果 BigQuery 无法转换除 ASCII 0 字符之外的字符,BigQuery 会将该字符转换为标准 Unicode 替换字符:�。

字段分隔符

CSV 文件中的分隔符可以是任何单字节字符。如果源文件使用 ISO-8859-1 编码,则任何字符都可以作为分隔符。如果源文件使用 UTF-8 编码,则可以使用十进制范围 1-127 (U+0001-U+007F) 中的任何字符,无需进行修改。您可以插入此范围之外的 ISO-8859-1 字符作为分隔符,BigQuery 会进行正确解读。但是,如果您使用多字节字符作为分隔符,则某些字节会被错误地解读为字段值的一部分。

一般而言,最佳做法是使用标准分隔符,例如制表符、竖线符或英文逗号。默认为英文逗号。

数据类型

布尔值。BigQuery 可以解析以下任意布尔数据对:1 或 0、true 或 false、t 或 f、yes 或 no、y 或 n(均不区分大小写)。架构自动检测功能将自动检测上述除 0 和 1 之外的所有值。

Date。类型为 DATE 的列必须采用 YYYY-MM-DD 格式。

日期时间。类型为 DATETIME 的列必须采用 YYYY-MM-DD HH:MM:SS[.SSSSSS] 格式。

时间。类型为 TIME 的列必须采用 HH:MM:SS[.SSSSSS] 格式。

Timestamp。BigQuery 接受各种时间戳格式。时间戳必须包含日期部分和时间部分。

  • 日期部分的格式可以是 YYYY-MM-DD,也可以是 YYYY/MM/DD

  • 时间戳部分必须采用 HH:MM[:SS[.SSSSSS]] 格式(秒和毫秒是可选的)。

  • 日期和时间必须用空格或“T”分隔。

  • (可选)日期和时间可后跟世界协调时间 (UTC) 偏移量或世界协调时间 (UTC) 地区指示符 (Z)。如需了解详情,请参阅时区

例如,以下所有值都是有效的时间戳值:

  • 2018-08-19 12:11
  • 2018-08-19 12:11:35
  • 2018-08-19 12:11:35.22
  • 2018/08/19 12:11
  • 2018-07-05 12:54:00 UTC
  • 2018-08-19 07:11:35.220 -05:00
  • 2018-08-19T12:11:35.220Z

如果您提供一个架构,BigQuery 可将 Unix Epoch 时间作为时间戳值。但是,架构自动检测不会检测这种情况,而是将值视为数值或字符串类型。

Unix Epoch 时间戳值示例:

  • 1534680695
  • 1.534680695e11

CSV 选项

如需更改 BigQuery 解析 CSV 数据的方式,请在控制台、经典版网页界面、bq 命令行工具或 API 中指定其他选项。

如需详细了解 CSV 格式,请参阅 RFC 4180

CSV 选项 控制台选项 经典版界面选项 bq 工具标志 BigQuery API 属性 说明
字段分隔符 字段分隔符:英文逗号、制表符、竖线符及自定义分隔符 字段分隔符:英文逗号、制表符、竖线符及其他 -F--field_delimiter fieldDelimiter (可选)CSV 文件中的字段的分隔符。该分隔符可以是任何 ISO-8859-1 单字节字符。若要使用 128-255 范围内的字符,必须对字符进行 UTF8 编码。BigQuery 会将字符串转换为 ISO-8859-1 编码格式,并使用编码字符串的第一个字节来拆分原始二进制状态的数据。BigQuery 也支持使用转义序列“\t”来指定制表符分隔符。默认值为英文逗号(“,”)。
标题行数 需跳过的标题行数 Header rows to skip --skip_leading_rows skipLeadingRows (可选)表示源数据中标题行数的整数。
允许的错误记录数 允许的错误数 Number of errors allowed --max_bad_records maxBadRecords (可选)BigQuery 在运行作业时可忽略的错误记录数上限。如果错误记录数超过该值,作业结果中将返回无效错误。默认值为 0,表示所有记录都必须有效。
换行符 允许引号括起来的内容中包含换行符 Allow quoted newlines --allow_quoted_newlines allowQuotedNewlines (可选)指示是否允许 CSV 文件中引用的数据部分包含换行符。默认值为 false。
自定义 null 值 --null_marker nullMarker (可选)指定表示 CSV 文件中 null 值的字符串。 例如,如果指定“\N”,BigQuery 将在加载 CSV 文件时将“\N”解读为 null 值。默认值为空字符串。如果将此属性设置为自定义值,那么在除 STRING 和 BYTE 之外的任意数据类型出现空字符串的情况下,BigQuery 将抛出一个错误。对于 STRING 和 BYTE 列,BigQuery 会将空字符串解读为空值。
末尾处可选列 允许使用可选列留空的行 Allow jagged rows --allow_jagged_rows allowJaggedRows (可选)接受末尾处缺少可选列的行。缺失值被视为 null 值。如果值为 false,则末尾处缺失列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效记录错误。默认值为 false。仅适用于 CSV,其他格式可忽略此选项。
未知值 忽略未知值 Ignore unknown values --ignore_unknown_values ignoreUnknownValues (可选)表示 BigQuery 是否应允许表架构中不存在的额外值。如果值为 true,将忽略额外值。如果值为 false,则含有额外列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效记录错误。默认值为 false。sourceFormat 属性决定了 BigQuery 将哪些值视为额外值:
  • CSV:末尾处的列
  • JSON:与任何列名称均不匹配的指定值
引用 --quote quote (可选)用于括起 CSV 文件中数据部分的值。 BigQuery 会将字符串转换为 ISO-8859-1 编码格式,并使用编码字符串的第一个字节来拆分原始二进制状态的数据。默认值为英文双引号 (")。如果您的数据不包含括起部分,则将属性值设置为空字符串。如果您的数据包含括起的换行符,则还必须将 allowQuotedNewlines 属性设置为 true。 要在引用值中包含特定引号字符,请在前面加上一个匹配的引号字符。例如,如果您要转义默认字符“ " ”,请使用“ "" ”。
编码 -E--encoding encoding (可选)数据的字符编码。支持的值为 UTF-8 或 ISO-8859-1 编码格式。默认值为 UTF-8 格式。使用 quotefieldDelimiter 属性的值拆分原始二进制数据后,BigQuery 会对数据进行解码。