从 Cloud Storage 加载 CSV 数据

从 Cloud Storage 加载 CSV 文件

从 Cloud Storage 加载 CSV 数据时,可以将数据加载到新的表或分区中,也可以将数据附加到现有的表或分区或覆盖现有的表或分区。在数据加载到 BigQuery 后,系统会将其转换为适用于 Capacitor 的列式格式(BigQuery 的存储格式)。

如果要将 Cloud Storage 中的数据加载到 BigQuery 表,则要包含该表的数据集必须与相应 Cloud Storage 存储分区位于同一区域或多区域位置。

要了解如何从本地文件加载 CSV 数据,请参阅将本地数据源中的数据加载到 BigQuery 中

限制

如果要将 Cloud Storage 中的 CSV 数据加载到 BigQuery,请注意以下事项:

  • CSV 文件不支持嵌套或重复数据。
  • 如果使用 gzip 压缩,BigQuery 将无法并行读取数据。与加载未压缩数据相比,将压缩的 CSV 数据加载到 BigQuery 的速度较为缓慢。
  • 加载 CSV 或 JSON 数据时,DATE 列的值必须使用英文短划线 (-) 分隔符,并且日期必须采用以下格式:YYYY-MM-DD(年-月-日)。
  • 加载 JSON 或 CSV 数据时,TIMESTAMP 列的值必须使用英文短划线 (-) 分隔符来分隔时间戳的日期部分,并且日期必须采用以下格式:YYYY-MM-DD(年-月-日)。时间戳的 hh:mm:ss(时-分-秒)部分必须使用英文冒号 (:) 分隔符。

CSV 编码

BigQuery 需要的 CSV 数据为采用 UTF-8 编码。如果您的 CSV 文件包含采用 ISO-8859-1(也称为 Latin-1)格式编码的数据,则应在加载数据时明确指定编码格式,以便其转换为 UTF-8 格式。

CSV 文件中的分隔符可以是任何 ISO-8859-1 单字节字符。若要使用 128-255 范围内的字符,必须对字符进行 UTF-8 编码。BigQuery 会将字符串转换为 ISO-8859-1 编码格式,并使用编码字符串的第一个字节来分隔原始二进制状态的数据。

所需权限

在将数据加载到 BigQuery 时,您需要拥有项目级或数据集级的权限,才能将数据加载到新的或现有的 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要访问包含该数据的存储分区。

BigQuery 权限

如果您要将数据从 Cloud Storage 加载到 BigQuery 中,必须被授予项目级或数据集级的 bigquery.dataOwnerbigquery.dataEditor 角色。这两个角色允许用户和组将数据加载到新表中,或将数据附加到现有表或覆盖现有表。

若授予项目级角色,则用户或组有权将数据加载到项目中任一数据集的表中。若授予数据集级角色,则用户或组只能将数据加载到该数据集的表中。

如需详细了解如何配置数据集访问权限,请参阅控制对数据集的访问权限。要详细了解 BigQuery 中的 IAM 角色,请参阅访问权限控制

Cloud Storage 权限

要从 Cloud Storage 存储分区加载数据,您必须被授予项目级或相应单个存储分区的 storage.objects.get 权限。如果要使用 URI 通配符,您还必须拥有 storage.objects.list 权限。

可以通过授予预定义的 IAM 角色 storage.objectViewer 来提供 storage.objects.getstorage.objects.list 权限。

将 CSV 数据加载到表中

要将 Cloud Storage 中的 CSV 数据加载到新 BigQuery 表中,或将数据附加到现有表中,请执行以下操作:

Console

  1. 在 GCP Console 中打开 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板的资源部分中,展开您的项目并选择数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表。加载数据的过程与创建空表的过程相同。

    查看数据集

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择所需的来源类型。

    • 在来源字段中,浏览并找到相应的文件/Cloud Storage 存储分区,或输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      查看数据集

    • 文件格式部分,请选择 CSV

  5. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择适当的数据集。

      查看数据集

    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。

    • 验证表类型是否设置为原生表

  6. 架构部分中,输入架构定义。

    • 通过以下方式,手动输入架构信息:

      • 启用以文本形式修改,并以 JSON 数组格式输入表架构。

      • 使用添加字段手动输入架构。

  7. 高级选项部分中选择适用项,然后点击创建表。要了解适用选项,请参阅 CSV 选项

传统版界面

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板中,将鼠标悬停在数据集上,点击向下箭头图标 向下箭头图标图片,然后点击 Create new table。加载数据的过程与创建空表的过程相同。

  3. Create Table 页面的 Source Data 部分,执行以下操作:

    • Location 部分选择 Cloud Storage,然后在来源字段中输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。
    • File format 部分,选择 Comma-separated values (CSV)
  4. Create Table 页面的 Destination Table 部分,执行以下操作:

    • Table name 部分,选择适当的数据集,然后在表名称字段中输入要在 BigQuery 中创建的表的名称。
    • 确认 Table type 是否设置为 Native table
  5. Schema 部分中,输入架构定义。

    • 通过以下方式,手动输入架构信息:

      • 点击 Edit as text,并以 JSON 数组格式输入表架构:

        以 JSON 数组格式添加架构

      • 使用 Add Field 手动输入架构:

        使用添加字段添加架构

  6. Options 部分中选择适用项,然后点击 Create Table。要了解适用选项,请参阅 CSV 选项

命令行

使用 bq load 命令,将 source_format 指定为 CSV,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

提供 --location 标志并将值设置为您的位置

bq --location=[LOCATION] load --source_format=[FORMAT] [DATASET].[TABLE] [PATH_TO_SOURCE] [SCHEMA]

其中:

  • [LOCATION] 是您的位置。--location 是可选标志。例如,如果您在东京区域使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • [FORMAT] 为 CSV。
  • [DATASET] 是现有数据集。
  • [TABLE] 是要向其中加载数据的表的名称。
  • [PATH_TO_SOURCE] 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符
  • [SCHEMA] 是一个有效架构。该架构可以是本地 JSON 文件,也可以作为命令的一部分通过内嵌方式输入。您还可以使用 --autodetect 标志,而无需提供架构定义。

此外,您还可以为 CSV 选项添加标志,以控制 BigQuery 解析数据的方式。例如,您可以使用 --skip_leading_rows 标志来忽略 CSV 文件中的标题行,还可以使用 --encoding 标志来标识数据的字符编码。

示例:

  • 以下命令可将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是在名为 myschema.json 的本地架构文件中定义的。mybucketmydataset 创建于 US 多区域位置。

    bq --location=US load --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv ./myschema.json
    
  • 以下命令可将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构采用以下格式通过内嵌方式定义:[FIELD]:[DATA_TYPE], [FIELD]:[DATA_TYPE]mybucketmydataset 创建于 US 多区域位置。

    bq --location=US load --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv qtr:STRING,sales:FLOAT,year:STRING
    

    使用命令行指定架构时,不能添加 RECORD (STRUCT) 类型和字段说明,也不能指定字段模式。所有字段模式均默认为 NULLABLE。如需添加字段说明、模式和 RECORD 类型,请改为提供 JSON 架构文件。

  • 以下命令可将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是在名为 myschema.json 的本地架构文件中定义的,并使用 --skip_leading_rows 标志忽略 CSV 文件中的前两个标题行。mybucketmydataset 创建于 asia-northeast1 区域。

    bq --location=asia-northeast1 load --skip_leading_rows=2 --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv ./myschema.json
    

API

设置以下属性以使用 API 加载 CSV 数据。

  1. 创建指向 Cloud Storage 中源数据的加载作业

  2. 作业资源jobReference 部分的 location 属性中,指定您的位置

  3. 源 URI 必须是完全限定的,格式为:gs://[BUCKET]/[OBJECT]。每个 URI 都可以包含一个“*”通配符

  4. configuration.load.sourceFormat 属性设置为 CSV,以指定 CSV 数据格式。

  5. 要检查作业状态,请调用 jobs.get([JOB_ID]*),其中 [JOB_ID] 是初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果存在 status.errorResult 属性,则请求失败,并且该对象将包含描述所出现的问题的相关信息。如果请求失败,则不创建任何表且不添加任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误列在返回的作业对象的 status.errors 属性中。

API 说明:

  • 加载作业不可分割。也就是说,如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据全部可用。

  • 在调用 jobs.insert() 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法比较不会受到网络故障的影响,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert() 具有幂等性,即,您可以对同一作业 ID 进行无限次重试,但最多只会有一个成功操作。

C#

在尝试此示例之前,请先按照 BigQuery 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档

using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryLoadTableGcsCsv
{
    public void LoadTableGcsCsv(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        var gcsURI = "gs://cloud-samples-data/bigquery/us-states/us-states.csv";
        var dataset = client.GetDataset(datasetId);
        var schema = new TableSchemaBuilder {
            { "name", BigQueryDbType.String },
            { "post_abbr", BigQueryDbType.String }
        }.Build();
        var destinationTableRef = dataset.GetTableReference(
            tableId: "us_states");
        // Create job configuration
        var jobOptions = new CreateLoadJobOptions()
        {
            // The source format defaults to CSV; line below is optional.
            SourceFormat = FileFormat.Csv,
            SkipLeadingRows = 1
        };
        // Create and run job
        var loadJob = client.CreateLoadJob(
            sourceUri: gcsURI, destination: destinationTableRef,
            schema: schema, options: jobOptions);
        loadJob.PollUntilCompleted();  // Waits for the job to complete.
        // Display the number of rows uploaded
        BigQueryTable table = client.GetTable(destinationTableRef);
        Console.WriteLine(
            $"Loaded {table.Resource.NumRows} rows to {table.FullyQualifiedId}");
    }
}

Go

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.csv")
gcsRef.SkipLeadingRows = 1
gcsRef.Schema = bigquery.Schema{
	{Name: "name", Type: bigquery.StringFieldType},
	{Name: "post_abbr", Type: bigquery.StringFieldType},
}
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteEmpty

job, err := loader.Run(ctx)
if err != nil {
	return err
}
status, err := job.Wait(ctx)
if err != nil {
	return err
}

if status.Err() != nil {
	return fmt.Errorf("Job completed with error: %v", status.Err())
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

Job job = table.load(FormatOptions.csv(), sourceUri);
// Wait for the job to complete
try {
  Job completedJob =
      job.waitFor(
          RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
          RetryOption.totalTimeout(Duration.ofMinutes(3)));
  if (completedJob != null && completedJob.getStatus().getError() == null) {
    // Job completed successfully
  } else {
    // Handle error case
  }
} catch (InterruptedException e) {
  // Handle interrupted wait
}

Node.js

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const datasetId = "my_dataset";
// const tableId = "my_table";

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.csv';

async function loadCSVFromGCS() {
  // Imports a GCS file into a table with manually defined schema.

  // Instantiate clients
  const bigqueryClient = new BigQuery();
  const storageClient = new Storage();

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load
  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    schema: {
      fields: [
        {name: 'name', type: 'STRING'},
        {name: 'post_abbr', type: 'STRING'},
      ],
    },
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .load(storageClient.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}
loadCSVFromGCS();

PHP

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv';
$schema = [
    'fields' => [
        ['name' => 'name', 'type' => 'string'],
        ['name' => 'post_abbr', 'type' => 'string']
    ]
];
$loadConfig = $table->loadFromStorage($gcsUri)->schema($schema)->skipLeadingRows(1);
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

使用 Client.load_table_from_uri() 方法从 Cloud Storage 中的 CSV 文件加载数据。通过将 LoadJobConfig.schema 属性设置为 SchemaField 对象列表,您可以提供明确的架构定义。

# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
    bigquery.SchemaField("name", "STRING"),
    bigquery.SchemaField("post_abbr", "STRING"),
]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

load_job = client.load_table_from_uri(
    uri, dataset_ref.table("us_states"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))

Ruby

在尝试此示例之前,请先按照 BigQuery 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def load_table_gcs_csv dataset_id = "your_dataset_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  gcs_uri  = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
  table_id = "us_states"

  load_job = dataset.load_job table_id, gcs_uri, skip_leading: 1 do |schema|
    schema.string "name"
    schema.string "post_abbr"
  end
  puts "Starting job #{load_job.job_id}"

  load_job.wait_until_done!  # Waits for table load to complete.
  puts "Job finished."

  table = dataset.table(table_id)
  puts "Loaded #{table.rows_count} rows to table #{table.id}"
end

通过架构自动检测功能加载 CSV 数据

Console

  1. 在 GCP Console 中打开 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板的资源部分中,展开您的项目并选择数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表。加载数据的过程与创建空表的过程相同。

    查看数据集

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择所需的来源类型。

    • 在来源字段中,浏览并找到相应的文件/Cloud Storage 存储分区,或输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      查看数据集

    • 文件格式部分,请选择 CSV

  5. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择适当的数据集。

      查看数据集

    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。

    • 确认表类型是否设置为原生表

  6. 架构部分的自动检测下,勾选架构和输入参数,以启用架构自动检测功能。

    自动检测链接

  7. 高级选项部分中选择适用项,然后点击创建表。要了解适用选项,请参阅 CSV 选项

传统版界面

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板中,将鼠标悬停在数据集上,点击向下箭头图标 向下箭头图标图片,然后点击 Create new table。加载数据的过程与创建空表的过程相同。

  3. Create Table 页面的 Source Data 部分,执行以下操作:

    • Location 部分选择 Cloud Storage,然后在来源字段中输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。
    • File format 部分,选择 Comma-separated values (CSV)
  4. Create Table 页面的 Destination Table 部分,执行以下操作:

    • Table name 部分,选择适当的数据集,然后在表名称字段中输入要在 BigQuery 中创建的表的名称。
    • 确认 Table type 是否设置为 Native table
  5. Schema 部分中,勾选 Auto-detect 选项,以启用架构自动检测功能。

    自动检测链接

  6. Options 部分中选择适用项,然后点击 Create Table。要了解适用选项,请参阅 CSV 选项

命令行

使用 bq load 命令,将 source_format 指定为 CSV,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

提供 --location 标志并将值设置为您的位置

bq --location=[LOCATION] load --autodetect --source_format=[FORMAT] [DATASET].[TABLE] [PATH_TO_SOURCE]

其中:

  • [LOCATION] 是您的位置。--location 是可选标志。例如,如果您在东京区域使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置该位置的默认值。
  • --autodetect 标志用于启用架构自动检测功能。
  • [FORMAT] 为 CSV。
  • [DATASET] 是现有数据集。
  • [TABLE] 是要向其中加载数据的表的名称。
  • [PATH_TO_SOURCE] 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符

此外,您还可以为 CSV 选项添加标志,以控制 BigQuery 解析数据的方式。例如,您可以使用 --skip_leading_rows 标志来忽略 CSV 文件中的标题行,还可以使用 --encoding 标志来标识数据的字符编码。

示例:

  • 以下命令可将 gs://mybucket/mydata.csv 中的数据加载到 mydataset 中名为 mytable 的表中。架构是使用架构自动检测功能定义的。 mybucketmydataset 创建于 US 多区域位置。

    bq --location=US load --autodetect --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv
    
  • 以下命令可将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。Cloud Storage URI 使用通配符,架构是使用架构自动检测功能定义的。 mybucketmydataset 创建于 asia-northeast1 区域。

    bq --location=asia-northeast1 load --autodetect --source_format=CSV mydataset.mytable gs://mybucket/mydata*.csv
    
  • 以下命令可将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。该命令包含以英文逗号分隔的 Cloud Storage URI 列表,架构是使用架构自动检测功能定义的。 mybucketmydataset 创建于 asia-northeast1 区域。

    bq --location=asia-northeast1 load --autodetect --source_format=CSV mydataset.mytable "gs://mybucket/myfile.csv,gs://mybucket/myfile2.csv"
    

API

设置以下属性以使用 API 加载 CSV 数据。

  1. 创建指向 Cloud Storage 中源数据的加载作业

  2. 作业资源jobReference 部分的 location 属性中,指定您的位置

  3. 源 URI 必须是完全限定的,格式为:gs://[BUCKET]/[OBJECT]。每个 URI 都可以包含一个“*”通配符

  4. configuration.load.sourceFormat 属性设置为 CSV,以指定 CSV 数据格式。

  5. 要检查作业状态,请调用 jobs.get([JOB_ID]*),其中 [JOB_ID] 是初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果存在 status.errorResult 属性,则请求失败,并且该对象将包含描述所出现的问题的相关信息。如果请求失败,则不创建任何表且不添加任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误列在返回的作业对象的 status.errors 属性中。

API 说明:

  • 加载作业不可分割。也就是说,如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据全部可用。

  • 在调用 jobs.insert() 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法比较不会受到网络故障的影响,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert() 具有幂等性,即,您可以对同一作业 ID 进行无限次重试,但最多只会有一个成功操作。

Go

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.AutoDetect = true
gcsRef.SkipLeadingRows = 1
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

job, err := loader.Run(ctx)
if err != nil {
	return err
}
status, err := job.Wait(ctx)
if err != nil {
	return err
}

if status.Err() != nil {
	return fmt.Errorf("job completed with error: %v", status.Err())
}

Node.js

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

/**
 * TODO(developer): Uncomment the following lines before running the sample
 */
// const datasetId = "my_dataset";
// const tableId = "my_table";

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.csv';

async function loadCSVFromGCSAutodetect() {
  // Imports a GCS file into a table with autodetected schema.

  // Instantiate clients
  const bigqueryClient = new BigQuery();
  const storageClient = new Storage();

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load
  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    autodetect: true,
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .load(storageClient.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}
loadCSVFromGCSAutodetect();

PHP

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv';
$loadConfig = $table->loadFromStorage($gcsUri)->autodetect(true)->skipLeadingRows(1);
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

LoadJobConfig.autodetect 属性设置为 True,以便 BigQuery 根据输入数据样本来推断架构:

# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("us_states"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))

使用 CSV 数据覆盖表

您可以通过添加来自源文件的数据或附加查询结果,将其他数据加载到表中。如果数据架构与目标表或分区的架构不匹配,您可以在执行附加或覆盖操作时更新架构。

如果您要在附加数据时更新架构,BigQuery 将允许执行以下操作:

  • 添加新字段
  • REQUIRED 字段放宽为 NULLABLE

如果您要覆盖表,则架构始终会被覆盖。在覆盖表时,架构更新不受限制。

在 Console 或经典版 BigQuery 网页界面中,使用 Write preference 选项指定从源文件或查询结果加载数据时要执行的操作。CLI 和 API 包括以下选项:

Console 选项 经典版界面选项 CLI 标志 BigQuery API 属性 说明
只写入空白表 Write if empty WRITE_EMPTY 仅当表为空时才写入数据。
附加到表 Append to table --noreplace--replace=false;如果未指定 --[no]replace,则默认为附加 WRITE_APPEND (默认)在表末尾附加数据。
覆盖表 Overwrite table --replace--replace=true WRITE_TRUNCATE 清空表中所有现有数据然后再写入新数据。

默认情况下,除非写入处置方式发生更改,否则加载作业会将数据附加到表中。如果您希望将数据替换为加载作业中的数据,则可以选择覆盖 BigQuery 表中的数据:

Console

  1. 在 GCP Console 中打开 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板的资源部分中,展开您的项目并选择数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表。加载数据的过程与创建空表的过程相同。

    创建表

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择所需的来源类型。

    • 在来源字段中,浏览并找到相应的文件/Cloud Storage 存储分区,或输入 Cloud Storage URI。请注意,BigQuery 网页界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      选择文件

    • 文件格式部分,请选择 CSV

  5. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择适当的数据集。

      选择数据集

    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。

    • 验证表类型是否设置为原生表

  6. 架构部分中,输入架构定义。

    • 通过以下方式,手动输入架构信息:

      • 启用以文本形式修改,并以 JSON 数组格式输入表架构。

      • 使用添加字段手动输入架构。

  7. 高级选项部分,为写入偏好设置选择只写入空白表附加到表覆盖表

    查看数据集

  8. 点击创建表

传统版界面

  1. 转到 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板中,将鼠标悬停在数据集上,点击向下箭头图标 向下箭头图标图片,然后点击 Create new table。加载数据的过程与创建空表的过程相同。

  3. Create Table 页面的 Source Data 部分,执行以下操作:

    • Location 部分,选择 Cloud Storage,然后在来源字段中输入 Cloud Storage URI。请注意,此界面不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要向其中附加数据或覆盖其数据的表所属的数据集位于同一位置。
    • File format 部分,选择 Comma-separated values (CSV)
  4. Create Table 页面的 Destination Table 部分,执行以下操作:

    • Table name 部分,选择适当的数据集,然后在表名称字段中输入您要向其中附加数据或覆盖其数据的表的名称。
    • 确认 Table type 是否设置为 Native table
  5. Schema 部分中,输入架构定义。

    • 对于 CSV 文件,您可以勾选 Auto-detect 选项,以启用架构自动检测功能。

      自动检测链接

    • 您也可以通过以下方式手动输入架构信息:

      • 点击 Edit as text,并以 JSON 数组格式输入表架构:

        以 JSON 数组格式添加架构

      • 使用 Add Field 手动输入架构:

        使用添加字段添加架构

  6. Options 部分,为 Write preference 选择 Write if emptyAppend to tableOverwrite table

    使用添加字段添加架构

  7. 点击 Create Table

命令行

输入带 --replace 标志的 bq load 命令以覆盖表。提供 --location 标志并将值设置为您的位置。使用 --noreplace 标志将数据附加到表中。如果未指定标志,则默认附加数据。

向表附加数据或覆盖表时,可以使用 --schema_update_option 标志来将目标表的架构更新为新数据的架构。以下选项可与 --schema_update_option 标志结合使用:

  • ALLOW_FIELD_ADDITION:向架构添加新字段;新字段不能为 REQUIRED
  • ALLOW_FIELD_RELAXATION:将必填字段放宽为可以为 Null;重复此选项可指定值列表
bq --location=[LOCATION] load --[no]replace [DATASET].[TABLE] [PATH_TO_SOURCE] [SCHEMA]

其中:

  • [LOCATION] 是您的位置--location 是可选标志。您可以使用 .bigqueryrc 文件设置默认位置值。
  • [DATASET] 是现有数据集。
  • [TABLE] 是要向其中加载数据的表的名称。
  • [PATH_TO_SOURCE] 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符
  • [SCHEMA] 是一个有效架构。该架构可以是本地 JSON 文件,也可以作为命令的一部分通过内嵌方式输入。您还可以使用 --autodetect 标志,而无需提供架构定义。

此外,您还可以为 CSV 选项添加标志,以控制 BigQuery 解析 CSV 数据的方式。例如,您可以使用 --skip_leading_rows 标志来忽略 CSV 文件中的标题行,还可以使用 --encoding 标志来标识数据的字符编码。

示例:

  • 以下命令可从 gs://mybucket/mydata.csv 加载数据并覆盖 mydataset 中的 mytable 表。架构是通过架构自动检测功能定义的。mybucketmydataset 创建于 US 多区域位置。

    bq --location=US load --autodetect --replace --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv
    
  • 以下命令可从 gs://mybucket/mydata.csv 加载数据,并将数据附加到 mydataset 中的 mytable 表。架构是使用 JSON 架构文件 (myschema.json) 定义的。mybucketmydataset 创建于 US 多区域位置。

    bq --location=US load --autodetect --noreplace --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv ./myschema.json
    
  • 以下命令可从 gs://mybucket/mydata.csv 加载数据,并将数据附加到 mydataset 中的 mytable 表。使用了名为 myschema.json 的本地 JSON 架构文件。架构定义包含目标表中不存在的新字段。mybucketmydataset 创建于 asia-northeast1 区域。

    bq --location=asia-northeast1 load --noreplace --schema_update_option=ALLOW_FIELD_ADDITION --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv ./myschema.json
    
  • 以下命令可从 gs://mybucket/mydata.csv 加载数据,并将数据附加到 mydataset 中的 mytable 表。使用了名为 myschema.json 的本地 JSON 架构文件。架构定义将两个 REQUIRED 字段更改(放宽)为 NULLABLEmybucketmydataset 创建于 asia-northeast1 区域。

    bq --location=asia-northeast1 load --noreplace --schema_update_option=ALLOW_FIELD_RELAXATION --source_format=CSV mydataset.mytable gs://mybucket/mydata.csv ./myschema.json
    

API

设置以下属性以使用 API 加载 CSV 数据。

  1. 创建指向 Cloud Storage 中源数据的加载作业

  2. 作业资源jobReference 部分的 location 属性中,指定您的位置

  3. 源 URI 必须是完全限定的,格式为:gs://[BUCKET]/[OBJECT]。要添加多个 URI,可采用英文逗号分隔列表的形式。请注意,从 Cloud Storage 加载 CSV 数据时,也可使用通配符

  4. configuration.load.sourceFormat 属性设置为 CSV,以指定数据格式。

  5. configuration.load.writeDisposition 属性设置为 WRITE_TRUNCATEWRITE_APPENDWRITE_EMPTY,以指定写入偏好设置。

  6. 如需更新加载作业中的架构,请将 configuration.load.schemaUpdateOptions 属性设置为 ALLOW_FIELD_ADDITIONALLOW_FIELD_RELAXATION

Go

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.AutoDetect = true
gcsRef.SkipLeadingRows = 1
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate

job, err := loader.Run(ctx)
if err != nil {
	return err
}
status, err := job.Wait(ctx)
if err != nil {
	return err
}

if status.Err() != nil {
	return fmt.Errorf("job completed with error: %v", status.Err())
}

Node.js

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

要替换现有表中的行,请将 metadata 参数中的 writeDisposition 值设置为 'WRITE_TRUNCATE'

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const datasetId = "my_dataset";
// const tableId = "my_table";

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.csv';

async function loadCSVFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  // Instantiate clients
  const bigqueryClient = new BigQuery();
  const storageClient = new Storage();

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load
  const metadata = {
    sourceFormat: 'CSV',
    skipLeadingRows: 1,
    schema: {
      fields: [
        {name: 'name', type: 'STRING'},
        {name: 'post_abbr', type: 'STRING'},
      ],
    },
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .load(storageClient.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}
loadCSVFromGCSTruncate();

PHP

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv';
$loadConfig = $table->loadFromStorage($gcsUri)->skipLeadingRows(1)->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

如需替换现有表中的行,请将 LoadJobConfig.write_disposition 属性设置为 SourceFormat 常量 WRITE_TRUNCATE

# from google.cloud import bigquery
# client = bigquery.Client()
# table_ref = client.dataset('my_dataset').table('existing_table')

job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
    uri, table_ref, job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(table_ref)
print("Loaded {} rows.".format(destination_table.num_rows))

CSV 选项

如需更改 BigQuery 解析 CSV 数据的方式,请在 Console、传统版界面、CLI 或 API 中指定其他选项。如需详细了解 CSV 格式,请参阅 RFC 4180

CSV 选项 Console 选项 经典版界面选项 CLI 标志 BigQuery API 属性 说明
字段分隔符 字段分隔符:英文逗号、制表符、竖线符及自定义分隔符 Field delimiter: Comma, Tab, Pipe, Other -F--field_delimiter fieldDelimiter (可选)CSV 文件中的字段的分隔符。该分隔符可以是任何 ISO-8859-1 单字节字符。要使用 128-255 范围内的字符,必须对字符进行 UTF8 编码。BigQuery 会将字符串转换为 ISO-8859-1 编码格式,并使用编码字符串的第一个字节来分隔原始二进制状态的数据。BigQuery 也支持使用转义序列“\t”来指定制表符分隔符。默认值为英文逗号(“,”)。
标题行数 要跳过的标题行数 Header rows to skip --skip_leading_rows skipLeadingRows (可选)表示源数据中标题行数的整数。
允许的错误记录数 允许的错误数 Number of errors allowed --max_bad_records maxBadRecords (可选)BigQuery 在运行作业时可忽略的错误记录数上限。如果错误记录数超过该值,作业结果中将返回无效错误。默认值为 0,表示所有记录都必须有效。
换行符 允许引用的数据中包含换行符 Allow quoted newlines --allow_quoted_newlines allowQuotedNewlines (可选)指示是否允许 CSV 文件中引用的数据部分包含换行符。默认值为 false。
自定义 null 值 --null_marker nullMarker (可选)指定表示 CSV 文件中 null 值的字符串。例如,如果指定“\N”,BigQuery 将在加载 CSV 文件时将“\N”解读为 null 值。默认值为空字符串。如果将此属性设置为自定义值,那么在除 STRING 和 BYTE 之外的任意数据类型出现空字符串的情况下,BigQuery 将抛出一个错误。对于 STRING 和 BYTE 列,BigQuery 会将空字符串解读为空值。
末尾处可选列 允许使用可选列留空的行 Allow jagged rows --allow_jagged_rows allowJaggedRows (可选)接受末尾处缺少可选列的行。缺失值被视为 null 值。如果值为 false,则末尾处缺失列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效记录错误。默认值为 false。仅适用于 CSV,其他格式可忽略此选项。
未知值 忽略未知值 Ignore unknown values --ignore_unknown_values ignoreUnknownValues (可选)表示 BigQuery 是否应允许表架构中不存在的额外值。如果值为 true,将忽略额外值。如果值为 false,则含有额外列的记录将被视为错误记录;如果错误记录太多,作业结果中将返回一个无效记录错误。默认值为 false。sourceFormat 属性决定了 BigQuery 将哪些值视为额外值:
  • CSV:末尾处的列
  • JSON:与任何列名称均不匹配的指定值
引用 --quote quote (可选)用于引用 CSV 文件中数据部分的值。BigQuery 会将字符串转换为 ISO-8859-1 编码格式,并使用编码字符串的第一个字节来分隔原始二进制状态的数据。默认值为英文双引号(“"”)。如果您的数据不包含引用部分,则将属性值设置为空字符串。如果您的数据包含引用的换行符,则还必须将 allowQuotedNewlines 属性设置为 true
编码 -E--encoding encoding (可选)数据的字符编码。支持的值为 UTF-8 或 ISO-8859-1 编码格式。默认值为 UTF-8 格式。使用 quotefieldDelimiter 属性的值拆分原始二进制数据后,BigQuery 会对数据进行解码。
此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
需要帮助?请访问我们的支持页面