从 Cloud Storage 加载 Parquet 数据

本页面概述了如何将 Cloud Storage 中的 Parquet 数据加载到 BigQuery 中。

Parquet 是一种面向列的开源数据格式,在 Apache Hadoop 生态系统中得到广泛运用。

从 Cloud Storage 加载 Parquet 数据时,可以将数据加载到新的表或分区中,也可以将其附加到或覆盖现有的表或分区。数据加载到 BigQuery 后,系统会将其转换为适用于 Capacitor 的列式格式(BigQuery 的存储格式)。

如需将 Cloud Storage 中的数据加载到 BigQuery 表,则包含该表的数据集必须与相应 Cloud Storage 存储分区位于同一区域或多区域位置。

如需了解如何从本地文件加载 Parquet 数据,请参阅从本地文件加载数据

Parquet 架构

在您将 Parquet 文件加载到 BigQuery 时,系统会自动从自描述源数据中检索表架构。BigQuery 从源数据中检索架构时,将使用按字母顺序显示的最后一个文件。

例如,您在 Cloud Storage 中具有以下 Parquet 文件:

gs://mybucket/00/
  a.parquet
  z.parquet
gs://mybucket/01/
  b.parquet

bq 命令行工具中运行此命令时,系统会加载所有文件(以英文逗号分隔列表形式),而架构来源于 mybucket/01/b.parquet

bq load \
--source_format=PARQUET \
dataset.table \
"gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

加载具有不同架构的多个 Parquet 文件时,多个架构中指定的相同列必须在每个架构定义中拥有相同的模式

当 BigQuery 检测架构时,会将某些 Parquet 数据类型转换为 BigQuery 数据类型,使其与 BigQuery SQL 语法兼容。如需了解详情,请参阅 Parquet 转换

Parquet 压缩

BigQuery 支持对 Parquet 文件中的数据块使用以下压缩编解码器:

  • GZip
  • LZO_1C 并且 LZO_1X
  • Snappy
  • ZSTD

所需权限

在将数据加载到 BigQuery 时,您需要拥有相关权限,才能运行加载作业并将数据加载到新的或现有的 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有对您的数据所在的存储分区的访问权限。

BigQuery 权限

至少需具备以下权限,才能将数据加载到 BigQuery。无论您是要将数据加载到新的表或分区,还是要附加到或覆盖现有的表或分区,都必须具备这些权限。

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.jobs.create

以下预定义 IAM 角色同时具有 bigquery.tables.createbigquery.tables.updateData 权限:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

以下预定义的 IAM 角色包含 bigquery.jobs.create 权限:

  • bigquery.user
  • bigquery.jobUser
  • bigquery.admin

此外,如果用户具有 bigquery.datasets.create 权限,则当该用户创建数据集时,系统会为其授予该数据集的 bigquery.dataOwner 访问权限。借助 bigquery.dataOwner 访问权限,用户可以通过使用加载作业在数据集中创建和更新表。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅访问权限控制

Cloud Storage 权限

如需从 Cloud Storage 存储分区中加载数据,您必须获得 storage.objects.get 权限。如果要使用 URI 通配符,您还必须具有 storage.objects.list 权限。

授予预定义的 IAM 角色 storage.objectViewer,即可同时提供 storage.objects.getstorage.objects.list 权限。

将 Parquet 数据加载到新表

您可以使用以下方式之一将 Parquet 数据加载到新表中:

  • Cloud Console
  • bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 客户端库

如需将 Parquet 数据从 Cloud Storage 加载到新的 BigQuery 表中,请执行以下操作:

控制台

  1. 在 Cloud Console 中打开 BigQuery 页面。

    转到 BigQuery 页面

  2. 探索器面板中,展开您的项目并选择数据集。

  3. 在详细信息面板中,点击创建表

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Cloud Storage。

    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意,Cloud Console 不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      选择文件。

    • 文件格式部分,选择 Parquet

  5. 创建表格页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。

      查看数据集。

    • 确认表类型设置为原生表

    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。

  6. 架构部分中,无需执行任何操作。架构在 Parquet 文件中为自描述形式。

  7. (可选)如需对表进行分区,请在分区和聚簇设置中选择相应选项:

    • 如需创建分区表,请点击不进行分区,选择按字段分区 (Partition by field),然后选择 DATETIMESTAMP 列。如果架构不包含 DATETIMESTAMP 列,则此选项不可用。
    • 如需创建提取时间分区表,请点击不进行分区,然后选择按提取时间分区
  8. (可选)在分区过滤条件中,点击需要分区过滤条件框,以要求用户添加 WHERE 子句来指定要查询的分区。要求使用分区过滤条件可以减少费用并提高性能。如需了解详情,请参阅查询分区表。如果已选择不进行分区,则此选项不可用。

  9. (可选)如需对该表进行聚簇,请在聚簇顺序框中,输入一到四个字段名称。

  10. (可选)点击高级选项

    • 写入偏好设置部分,选中只写入空白表。此选项创建一个新表并向其中加载数据。
    • 允许的错误数部分中,接受默认值 0 或输入可忽略的含错行数上限。如果包含错误的行数超过此值,该作业将生成 invalid 消息并失败。
    • 未知值部分,取消选中忽略未知值。 此选项仅适用于 CSV 和 JSON 文件。
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密
  11. 点击创建表

bq

使用 bq load 命令、通过 --source_format 标志指定 PARQUET,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --time_partitioning_type:此标志会在表上启用基于时间的分区,并设置分区类型。可能的值包括 HOURDAYMONTHYEAR。当您创建按 DATEDATETIMETIMESTAMP 列分区的表时,可选用此标志。基于时间的分区的默认分区类型为 DAY
  • --time_partitioning_expiration:此标志值为一个整数,指定了应在何时删除基于时间的分区(以秒为单位)。过期时间以分区的世界协调时间 (UTC) 日期加上这个整数值为准。
  • --time_partitioning_field:此标志表示用于创建分区表DATETIMESTAMP 列。如果在未提供此值的情况下启用了基于时间的分区,系统会创建提取时间分区表
  • --require_partition_filter:启用后,此选项会要求用户添加 WHERE 子句来指定要查询的分区。要求使用分区过滤条件可以减少费用并提高性能。如需了解详情,请参阅查询分区表
  • --clustering_fields:此标志表示以英文逗号分隔的列名称列表(最多包含 4 个列名称),用于创建聚簇表
  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。

    如需详细了解分区表,请参阅:

    如需详细了解聚簇表,请参阅:

    如需详细了解表加密,请参阅以下部分:

如需将 Parquet 数据加载到 BigQuery,请输入以下命令:

bq --location=LOCATION load \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

请替换以下内容:

  • LOCATION:您所在的位置。--location 是可选标志。例如,如果您在东京区域使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • FORMATPARQUET
  • DATASET:现有数据集。
  • TABLE:要向其中加载数据的表的名称。
  • PATH_TO_SOURCE 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。系统也支持使用通配符

示例:

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的表中。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的提取时间分区表中。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/mydata.parquet 中的数据加载到 mydataset 中名为 mytable 的分区表中。该表按 mytimestamp 列进行分区。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。Cloud Storage URI 使用通配符。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata*.parquet

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。该命令包含以英文逗号分隔的 Cloud Storage URI 列表(含通配符)。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    "gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://BUCKET/OBJECT。每个 URI 都可以包含一个“*”通配符

  4. sourceFormat 属性设置为 PARQUET,以指定 Parquet 数据格式。

  5. 如需检查作业状态,请调用 jobs.get(JOB_ID*),并将 JOB_ID 替换为初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果出现 status.errorResult 属性,则表示请求失败,并且该对象将包含描述问题的相关信息。如果请求失败,则系统不会创建任何表,也不会加载任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误会列在返回的作业对象的 status.errors 属性中。

API 说明

  • 加载作业兼具原子性和一致性:如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据都可用。

  • 通过调用 jobs.insert 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法受网络故障影响较小,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert 具有幂等性。您可以对同一作业 ID 进行无限次重试,但最多只会有一个成功操作。

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquet demonstrates loading Apache Parquet data from Cloud Storage into a table.
func importParquet(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquet {

  public static void runLoadParquet() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquet(datasetName);
  }

  public static void loadParquet(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet loaded successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("GCS Parquet was not loaded. \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Parquet file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.parquet
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadTableGCSParquet() {
  // Imports a GCS file into a table with Parquet source format.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET');
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

使用 Client.load_table_from_uri() 方法,启动从 Cloud Storage 加载数据的作业。如需使用 Parquet,请将 LoadJobConfig.source_format 属性设置为 SourceFormat 常量 PARQUET,并将作业配置作为 job_config 参数传递给 load_table_from_uri() 方法。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET,)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

使用 Parquet 数据覆盖或附加到表

您可以通过添加来自源文件的数据或附加查询结果,将其他数据加载到表中。

在 Cloud Console 中,使用写入偏好设置选项指定从源文件或查询结果加载数据时要执行的操作。

将其他数据加载到表中时,可选择以下选项:

控制台选项 bq 工具标志 BigQuery API 属性 说明
只写入空白表 WRITE_EMPTY 仅当表为空时才写入数据。
附加到表 --noreplace--replace=false;如果未指定 --[no]replace,则默认为附加 WRITE_APPEND (默认)在表末尾附加数据。
覆盖表 --replace--replace=true WRITE_TRUNCATE 清空表中所有现有数据然后再写入新数据。 此操作还会删除表架构,并移除所有 Cloud KMS 密钥。

如果将数据加载到现有表中,加载作业可以附加数据或覆盖表。

您可以使用以下方式之一对表执行附加或覆盖操作:

  • Cloud Console
  • bq 命令行工具的 bq load 命令
  • 调用 jobs.insert API 方法并配置 load 作业
  • 客户端库

如需使用 Parquet 数据附加数据或覆盖表,请执行以下操作:

控制台

  1. 在 Cloud Console 中打开 BigQuery 页面。

    转到 BigQuery 页面

  2. 探索器面板中,展开您的项目并选择数据集。

  3. 在详细信息面板中,点击创建表

  4. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Cloud Storage。

    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意,Cloud Console 不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要向其中附加数据或覆盖其数据的表所属的数据集位于同一位置。

      选择文件。

    • 文件格式部分,选择 Parquet

  5. 创建表格页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。

      选择数据集。

    • 表名称字段中,输入您要在 BigQuery 中对其执行附加或覆盖操作的表的名称。

    • 确认表类型设置为原生表

  6. Schema 部分中,无需执行任何操作。架构在 Parquet 文件中为自描述形式。

  7. 对于分区和聚簇设置,保留默认值。您无法通过对表执行附加或覆盖操作将表转换为分区表或聚簇表,并且 Cloud Console 不支持在加载作业中对分区表或聚簇表执行附加或覆盖操作。

  8. 点击高级选项

    • 写入偏好设置部分,选择附加到表覆盖表
    • 允许的错误数部分中,接受默认值 0 或输入可忽略的含错行数上限。如果包含错误的行数超过此值,该作业将生成 invalid 消息并失败。
    • 未知值部分,取消选中忽略未知值。 此选项仅适用于 CSV 和 JSON 文件。
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的密钥设置,BigQuery 将对静态数据进行加密

      覆盖表。

  9. 点击创建表

bq

输入带 --replace 标志的 bq load 命令可以覆盖表。使用 --noreplace 标志可将数据附加到表。如果未指定任何标志,则默认附加数据。您可以添加 --source_format 标志并将其设置为 PARQUET。由于系统会自动从自描述源数据中检索 Parquet 架构,因此您无需提供架构定义。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。
bq --location=LOCATION load \
--[no]replace \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

请替换以下内容:

  • location:您所在的位置--location 是可选标志。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatPARQUET
  • dataset:现有数据集。
  • table:要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。系统也支持使用通配符

示例:

以下命令可从 gs://mybucket/mydata.parquet 加载数据并覆盖 mydataset 中名为 mytable 的表。

    bq load \
    --replace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

以下命令可从 gs://mybucket/mydata.parquet 加载数据,并将数据附加到 mydataset 中名为 mytable 的表。

    bq load \
    --noreplace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

如需了解如何使用 bq 命令行工具附加和覆盖分区表,请参阅对分区表数据执行附加和覆盖操作

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://BUCKET/OBJECT。您可以采用英文逗号分隔列表的形式添加多个 URI。请注意,系统也支持通配符

  4. configuration.load.sourceFormat 属性设置为 PARQUET,以指定数据格式。

  5. configuration.load.writeDisposition 属性设置为 WRITE_TRUNCATEWRITE_APPEND,以指定写入偏好设置。

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquetTruncate demonstrates loading Apache Parquet data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importParquetTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquetReplaceTable {

  public static void runLoadParquetReplaceTable() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquetReplaceTable(datasetName);
  }

  public static void loadParquetReplaceTable(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Imports a GCS file into a table and overwrites table data if table already exists.
      // This sample loads CSV file at:
      // https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      // For more information on LoadJobConfiguration see:
      // https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/LoadJobConfiguration.Builder.html
      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              // Set the write disposition to overwrite existing table data.
              .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load into the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet overwrote existing table successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadParquetFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = "my_dataset";
  // const tableId = "my_table";

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableID = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET')->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

要替换现有表中的行,请将 LoadJobConfig.write_disposition 属性设置为 WriteDisposition 常量 WRITE_TRUNCATE

import six

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = six.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.PARQUET,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

加载 Hive 分区的 Parquet 数据

BigQuery 支持加载存储在 Cloud Storage 中的 Hive 分区 Parquet 数据,并将 Hive 分区列作为目标 BigQuery 代管表中的列进行填充。如需了解详情,请参阅加载外部分区数据

Parquet 转换

BigQuery 会将 Parquet 数据类型转换为以下 BigQuery 数据类型:

类型转换

Parquet 类型 Parquet 逻辑类型 BigQuery 数据类型
BOOLEAN BOOLEAN
INT32 无、INTEGERUINT_8UINT_16UINT_32INT_8INT_16INT_32 INTEGER
INT32 DECIMAL NUMERIC、BIGNUMER 或 STRING
INT32 DATE DATE
INT64 无、INTEGERUINT_64INT_64 INTEGER
INT64 DECIMAL NUMERIC、BIGNUMER 或 STRING
INT64 TIMESTAMPprecision=MILLIS (TIMESTAMP_MILLIS) TIMESTAMP
INT64 TIMESTAMPprecision=MICROS (TIMESTAMP_MICROS) TIMESTAMP
INT96 TIMESTAMP
FLOAT FLOAT
DOUBLE FLOAT
BYTE_ARRAY BYTES
BYTE_ARRAY STRING (UTF8) STRING
FIXED_LEN_BYTE_ARRAY DECIMAL NUMERIC、BIGNUMER 或 STRING
FIXED_LEN_BYTE_ARRAY BYTES

嵌套群组会转换为 STRUCT 类型。系统不支持 Parquet 类型和转换类型的其他组合。

decimal 逻辑类型

Decimal 逻辑类型可以转换为 NUMERICBIGNUMERICSTRING 类型。转换后的类型取决于 decimal 逻辑类型的精度和比例参数以及指定的 decimal 目标类型。按如下方式指定小数目标类型:

Enum 逻辑类型

Enum 逻辑类型可以转换为 STRINGBYTES。按如下方式指定转换目标类型:

List 逻辑类型

您可以为 Parquet LIST 逻辑类型启用架构推断。BigQuery 会检查 LIST 节点是否为标准形式

<optional | required> group <name> (LIST) {
  repeated group list {
    <optional | required> <element-type> element;
  }
}

如果是,则转换后架构中 LIST 节点的相应字段将被视为节点具有以下架构:

repeated <element-type> <name>

节点“list”和“element”被省略。

列名称转换

列名称只能包含字母(a-z、A-Z),数字 (0-9) 或下划线 (_),并且必须以字母或下划线开头。列名长度不得超过 300 个字符。列名不能使用以下任何前缀:

  • _TABLE_
  • _FILE_
  • _PARTITION

列名不可重复,即使其大小写不同也不行。例如,名为 Column1 的列和名为 column1 的列被视作相同。

如果 Parquet 文件中的某些列名称中包含英文句点 (.),则您不能加载此文件。

如果 Parquet 列名称包含其他字符(英文句点除外),系统会用下划线替换这些字符。您可以在列名称末尾处添加下划线,以避免冲突。例如,如果一个 Parquet 文件包含 Column1column1 这两列,那么系统会将这些列分别加载为 Column1column1_