从 Cloud Storage 加载 Avro 数据

Avro 是一种开放源代码数据格式,它将序列化数据与数据的架构打包在同一文件中。

从 Cloud Storage 加载 Avro 数据时,可以将数据加载到新的表或分区中,也可以附加到或覆盖现有的表或分区。当数据加载到 BigQuery 时,它会转化为适用于 Capacitor 的列式格式(BigQuery 的存储格式)。

如需将 Cloud Storage 中的数据加载到 BigQuery 表,则包含该表的数据集必须与相应 Cloud Storage 存储分区位于同一区域或多区域位置。

如需详细了解如何从本地文件加载 Avro 数据,请参阅从本地数据源将数据加载到 BigQuery

限制

将数据从 Cloud Storage 存储桶加载到 BigQuery 时,需要遵循以下限制:

  • 如果您的数据集位置设置为 US 多区域以外的值,则 Cloud Storage 存储桶必须与数据集位于同一单区域中或包含在同一多区域内。
  • BigQuery 不保证外部数据源的数据一致性。在查询运行的过程中,底层数据的更改可能会导致意外行为。
  • BigQuery 不支持 Cloud Storage 对象版本控制。如果您在 Cloud Storage URI 中添加了世代编号,则加载作业将失败。

输入文件要求

将 Avro 文件加载到 BigQuery 时,请遵循以下准则,以避免 resourcesExceeded 错误:

  • 行大小不得超过 50 MB。
  • 如果该行包含许多数组字段或任何非常长的数组字段,请将数组值拆分为单独的字段。

须知事项

授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色,并创建一个数据集和表来存储您的数据。

所需权限

如需将数据加载到 BigQuery,您需要拥有 IAM 权限才能运行加载作业以及将数据加载到 BigQuery 表和分区中。如果要从 Cloud Storage 加载数据,您还需要拥有访问包含数据的存储桶的 IAM 权限。

将数据加载到 BigQuery 的权限

如需将数据加载到新的 BigQuery 表或分区中,或者附加或覆盖现有的表或分区,您需要拥有以下 IAM 权限:

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.tables.update
  • bigquery.jobs.create

以下预定义 IAM 角色都具有将数据加载到 BigQuery 表或分区所需的权限:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin(包括 bigquery.jobs.create 权限)
  • bigquery.user(包括 bigquery.jobs.create 权限)
  • bigquery.jobUser(包括 bigquery.jobs.create 权限)

此外,如果您拥有 bigquery.datasets.create 权限,则可以在自己创建的数据集中使用加载作业创建和更新表。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

从 Cloud Storage 加载数据的权限

如需获得从 Cloud Storage 存储桶加载数据所需的权限,请让您的管理员为您授予存储桶的 Storage Admin (roles/storage.admin) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供从 Cloud Storage 存储桶加载数据所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需从 Cloud Storage 存储桶加载数据,您需要具备以下权限:

  • storage.buckets.get
  • storage.objects.get
  • storage.objects.list (required if you are using a URI wildcard)

您也可以使用自定义角色或其他预定义角色来获取这些权限。

创建数据集和表

如需存储数据,您必须创建一个 BigQuery 数据集,然后在该数据集中创建一个 BigQuery 表

Avro 的优势

Avro 是将数据加载到 BigQuery 的首选格式。与加载 CSV 和 JSON(换行符分隔)相比,加载 Avro 文件具有以下优势:

  • Avro 二进制格式:
    • 加载速度更快。即使数据块被压缩,也可以并行读取数据。
    • 无需输入或序列化。
    • 更容易解析,因为没有其他格式(如 ASCII)中存在的编码问题。
  • 将 Avro 文件加载到 BigQuery 时,系统会自动从自描述源数据中检索表架构。

Avro 架构

将 Avro 文件加载到新的 BigQuery 表时,系统会自动使用源数据检索表架构。BigQuery 从源数据中检索架构时,将使用按字母顺序显示的最后一个文件。

假设您在 Cloud Storage 中有以下 Avro 文件:

gs://mybucket/00/
  a.avro
  z.avro
gs://mybucket/01/
  b.avro

在 bq 命令行工具中运行此命令时,系统会加载所有文件(以英文逗号分隔列表形式),而架构来源于 mybucket/01/b.avro

bq load \
--source_format=AVRO \
dataset.table \
"gs://mybucket/00/*.avro","gs://mybucket/01/*.avro"

在导入多个具有不同 Avro 架构的 Avro 文件时,所有架构都必须与 Avro 的架构解决方案兼容。

当 BigQuery 检测架构时,会将某些 Avro 数据类型转换为 BigQuery 数据类型,使其与 GoogleSQL 语法兼容。如需了解详细信息,请参阅 Avro 转化

如需提供用于创建外部表的表架构,请将 BigQuery API 中的 referenceFileSchemaUri 属性或 bq 命令行工具中的
--reference_file_schema_uri 参数设置为参考文件的网址。

例如 --reference_file_schema_uri="gs://mybucket/schema.avro"

Avro 压缩

BigQuery 支持对 Avro 文件内容使用以下压缩编解码器:

  • Snappy
  • DEFLATE
  • ZSTD

向新表加载 Avro 数据

如需将 Avro 数据从 Cloud Storage 加载到新的 BigQuery 表中,请选择以下选项之一:

控制台

  1. 在 Google Cloud 控制台中,打开 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,展开您的项目并选择数据集。

  3. 展开 操作选项,然后点击打开

  4. 在详情面板中,点击创建表

  5. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Google Cloud Storage

    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意,Google Cloud 控制台不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要创建的表所属的数据集位于同一位置。

      选择文件

    • 对于 File format,请选择 Avro

  6. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。
    • 确认表类型设置为原生表
    • 表名称字段中,输入您要在 BigQuery 中创建的表的名称。
  7. 架构部分中,无需执行任何操作。架构在 Avro 文件中为自描述形式。

  8. (可选)如需对表进行分区,请在分区和聚簇设置中选择相应选项。如需了解详情,请参阅创建分区表

  9. (可选)在分区过滤条件中,点击需要分区过滤条件框,以要求用户添加 WHERE 子句来指定要查询的分区。要求分区过滤条件有可能减少费用并提高性能。如需了解详情,请参阅在查询中要求使用分区过滤条件。如果已选择不进行分区,则此选项不可用。

  10. (可选)如需对该表进行聚簇,请在聚簇顺序框中,输入一到四个字段名称。

  11. (可选)点击高级选项

    • 写入偏好设置部分,选中只写入空白表。此选项创建一个新表并向其中加载数据。
    • 未知值部分,取消选中忽略未知值。此选项仅适用于 CSV 和 JSON 文件。
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 管理的 密钥设置,BigQuery 将对静态数据进行加密
  12. 点击创建表

SQL

使用 LOAD DATA DDL 语句. 以下示例将 Avro 文件加载到新的 mytable 表中:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    LOAD DATA OVERWRITE mydataset.mytable
    FROM FILES (
      format = 'avro',
      uris = ['gs://bucket/path/file.avro']);

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

bq

使用 bq load 命令,通过 --source_format 标志指定 AVRO,并添加 Cloud Storage URI。您可以添加单个 URI、以英文逗号分隔的 URI 列表或含有通配符的 URI。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --time_partitioning_type:此标志会在表上启用基于时间的分区,并设置分区类型。可能的值包括 HOURDAYMONTHYEAR。当您创建按 DATEDATETIMETIMESTAMP 列分区的表时,可选用此标志。基于时间的分区的默认分区类型为 DAY。 您无法更改现有表上的分区规范。
  • --time_partitioning_expiration:此标志值为一个整数,指定了应在何时删除基于时间的分区(以秒为单位)。过期时间以分区的世界协调时间 (UTC) 日期加上这个整数值为准。
  • --time_partitioning_field:此标志表示用于创建分区表的 DATETIMESTAMP 列。如果在未提供此值的情况下启用了基于时间的分区,系统会创建注入时间分区表。
  • --require_partition_filter:启用后,此选项会要求用户添加 WHERE 子句来指定要查询的分区。要求分区过滤条件有可能减少费用并提高性能。 如需了解详情,请参阅在查询中要求使用分区过滤条件
  • --clustering_fields:此标志表示以英文逗号分隔的列名称列表(最多包含 4 个列名称),用于创建聚簇表
  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。

    如需详细了解分区表,请参阅:

    如需详细了解聚簇表,请参阅:

    如需详细了解表加密,请参阅以下部分:

如需将 Avro 数据加载到 BigQuery,请输入以下命令:

bq --location=location load \
--source_format=format \
dataset.table \
path_to_source

请替换以下内容:

  • location 是您的位置。--location 是可选标志。例如,如果您在东京区域使用 BigQuery,可将该标志的值设置为 asia-northeast1。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatAVRO
  • dataset 是现有数据集。
  • table 是要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符

示例:

以下命令将 gs://mybucket/mydata.avro 中的数据加载到 mydataset 中名为 mytable 的表中。

    bq load \
    --source_format=AVRO \
    mydataset.mytable \
    gs://mybucket/mydata.avro

以下命令将数据从 gs://mybucket/mydata.avro 加载到 mydataset 中名为 mytable 的注入时间分区表中。

    bq load \
    --source_format=AVRO \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.avro

以下命令将 gs://mybucket/mydata.avro 中的数据加载到 mydataset 中名为 mytable 的新分区表中。该表按 mytimestamp 列进行分区。

    bq load \
    --source_format=AVRO \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.avro

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。Cloud Storage URI 使用通配符。

    bq load \
    --source_format=AVRO \
    mydataset.mytable \
    gs://mybucket/mydata*.avro

以下命令将 gs://mybucket/ 中多个文件的数据加载到 mydataset 中名为 mytable 的表中。该命令包含以英文逗号分隔的 Cloud Storage URI 列表(含通配符)。

    bq load \
    --source_format=AVRO \
    mydataset.mytable \
    "gs://mybucket/00/*.avro","gs://mybucket/01/*.avro"

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://bucket/object。每个 URI 只可包含一个“*”通配符

  4. sourceFormat 属性设置为 AVRO,以指定 Avro 数据格式。

  5. 如需检查作业状态,请调用 jobs.get(job_id*),其中 job_id 是初始请求返回的作业的 ID。

    • 如果 status.state = DONE,则表示作业已成功完成。
    • 如果出现 status.errorResult 属性,则表示请求失败,并且该对象将包含描述问题的相关信息。如果请求失败,则不创建任何表且不加载任何数据。
    • 如果未出现 status.errorResult,则表示作业已成功完成,但可能存在一些非严重错误,如导入一些行时出错。非严重错误会列在返回的作业对象的 status.errors 属性中。

API 说明

  • 加载作业兼具原子性和一致性。也就是说,如果加载作业失败,则所有数据都不可用;如果加载作业成功,则所有数据全部可用。

  • 通过调用 jobs.insert 来创建加载作业时,最佳做法是生成唯一 ID,并将其作为 jobReference.jobId 传递。此方法受网络故障影响较小,因为客户端可以对已知的作业 ID 进行轮询或重试。

  • 对指定的作业 ID 调用 jobs.insert 具有幂等性。您可以对同一作业 ID 进行无限次重试,但最多只会有一个成功操作。

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importAvro demonstrates loading Apache Avro data from Cloud Storage into a table.
func importAvro(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.avro")
	gcsRef.SourceFormat = bigquery.Avro
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;

// Sample to load Avro data from Cloud Storage into a new BigQuery table
public class LoadAvroFromGCS {

  public static void runLoadAvroFromGCS() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro";
    loadAvroFromGCS(datasetName, tableName, sourceUri);
  }

  public static void loadAvroFromGCS(String datasetName, String tableName, String sourceUri) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(datasetName, tableName);
      LoadJobConfiguration loadConfig =
          LoadJobConfiguration.of(tableId, sourceUri, FormatOptions.avro());

      // Load data from a GCS Avro file into the table
      Job job = bigquery.create(JobInfo.of(loadConfig));
      // Blocks until this load table job completes its execution, either failing or succeeding.
      job = job.waitFor();
      if (job.isDone()) {
        System.out.println("Avro from GCS successfully loaded in a table");
      } else {
        System.out.println(
            "BigQuery was unable to load into the table due to an error:"
                + job.getStatus().getError());
      }
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Column not added during load append \n" + e.toString());
    }
  }
}

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Avro file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.avro
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.avro';

async function loadTableGCSAvro() {
  // Imports a GCS file into a table with Avro source format.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'us_states';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const jobConfigurationLoad = {
    load: {sourceFormat: 'AVRO'},
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), jobConfigurationLoad);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.AVRO)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

从 Avro 数据中提取 JSON 数据

您可以通过以下两种方式确保将 Avro 数据作为 JSON 数据加载到 BigQuery 中:

  1. 通过将 sqlType 设置为 JSON 来注释 Avro 架构。在下面的 Avro 架构中加载数据时,json_field 列将读取为 JSON 类型:

    {
        "type": {"type": "string", "sqlType": "JSON"},
        "name": "json_field"
    }
  2. 明确指定 BigQuery 目标表架构,并将列类型设置为 JSON。如需了解详情,请参阅指定架构

如果您未在 Avro 架构或 BigQuery 表架构中指定 JSON 类型,则数据将读取为 STRING

使用 Avro 数据覆盖或附加到表

您可以通过添加来自源文件的数据或附加查询结果,将其他数据加载到表中。

在 Google Cloud 控制台中,使用写入偏好设置选项指定从源文件或查询结果加载数据时要执行的操作。

将其他数据加载到表中时,可选择以下选项:

控制台选项 bq 工具标志 BigQuery API 属性 说明
只写入空白表 不支持 WRITE_EMPTY 仅当表为空时才写入数据。
Append to table --noreplace--replace=false;如果未指定 --[no]replace,则默认为附加 WRITE_APPEND 默认)在表末尾附加数据。
覆盖表 --replace--replace=true WRITE_TRUNCATE 清空表中所有现有数据然后再写入新数据。 此操作还会删除表架构和行级安全性,并移除所有 Cloud KMS 密钥。

如果将数据加载到现有表中,加载作业可以附加数据或覆盖表。

如需使用 Avro 数据附加或覆盖表,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,打开 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,展开您的项目并选择数据集。

  3. 展开 操作选项,然后点击打开

  4. 在详情面板中,点击创建表

  5. 创建表页面的来源部分,执行以下操作:

    • 基于以下数据创建表部分,选择 Cloud Storage。
    • 在来源字段中,浏览至或输入 Cloud Storage URI。请注意, Google Cloud 控制台不支持添加多个 URI,但支持使用通配符。Cloud Storage 存储分区必须与您要向其中附加数据或覆盖其数据的表所属的数据集位于同一位置。

      选择文件

    • 对于 File format,请选择 Avro

  6. 创建表页面的目标位置部分,执行以下操作:

    • 数据集名称部分,选择相应数据集。

      选择数据集

    • 表名称字段中,输入您要向其附加数据或覆盖其数据的 BigQuery 表的名称。

    • 确认表类型设置为原生表

  7. 架构部分中,无需执行任何操作。架构在 Avro 文件中为自描述形式。

  8. 对于分区和聚簇设置,保留默认值。您无法通过附加或覆盖表将表转换为分区表或聚簇表,并且 Google Cloud 控制台不支持在加载作业中对分区表或聚簇表执行附加或覆盖操作。

  9. 点击高级选项

    • 写入偏好设置部分,选择附加到表覆盖表
    • 未知值部分,取消选中忽略未知值。此选项仅适用于 CSV 和 JSON 文件。
    • 加密部分中,点击客户管理的密钥,以使用 Cloud Key Management Service 密钥。如果保留 Google 自有且受管理的 密钥设置,BigQuery 将对静态数据进行加密
  10. 点击创建表

SQL

使用 LOAD DATA DDL 语句. 以下示例将 Avro 文件附加到表 mytable

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    LOAD DATA INTO mydataset.mytable
    FROM FILES (
      format = 'avro',
      uris = ['gs://bucket/path/file.avro']);

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

bq

输入带 --replace 标志的 bq load 命令可以覆盖表。使用 --noreplace 标志可将数据附加到表。如果未指定任何标志,则默认附加数据。您可以提供 --source_format 标志并将其设置为 AVRO。由于系统会从自描述源数据中自动检索 Avro 架构,因此您无需提供架构定义。

(可选)提供 --location 标志并将其值设置为您的位置

其他可选标志包括:

  • --destination_kms_key:用于加密表数据的 Cloud KMS 密钥。
bq --location=location load \
--[no]replace \
--source_format=format \
dataset.table \
path_to_source

请替换以下内容:

  • location 是您所在的位置--location 是可选标志。您可以使用 .bigqueryrc 文件设置位置的默认值。
  • formatAVRO
  • dataset 是现有数据集。
  • table 是要向其中加载数据的表的名称。
  • path_to_source 是完全限定的 Cloud Storage URI 或以英文逗号分隔的 URI 列表。您还可以使用通配符

示例:

以下命令可从 gs://mybucket/mydata.avro 加载数据并覆盖 mydataset 中名为 mytable 的表。

    bq load \
    --replace \
    --source_format=AVRO \
    mydataset.mytable \
    gs://mybucket/mydata.avro

以下命令可从 gs://mybucket/mydata.avro 加载数据,并将数据附加到 mydataset 中名为 mytable 的表。

    bq load \
    --noreplace \
    --source_format=AVRO \
    mydataset.mytable \
    gs://mybucket/mydata.avro

如需了解如何使用 bq 命令行工具附加和覆盖分区表,请参阅对分区表数据执行附加和覆盖操作

API

  1. 创建指向 Cloud Storage 中源数据的 load 作业。

  2. (可选)在作业资源 jobReference 部分的 location 属性中指定您的位置

  3. source URIs 属性必须是完全限定的,格式为 gs://bucket/object。您可以采用逗号分隔列表的形式添加多个 URI。请注意,系统也支持通配符

  4. configuration.load.sourceFormat 属性设置为 AVRO,以指定数据格式。

  5. configuration.load.writeDisposition 属性设置为 WRITE_TRUNCATEWRITE_APPEND,以指定写入偏好设置。

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importAvroTruncate demonstrates loading Apache Avro data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importAvroTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.avro")
	gcsRef.SourceFormat = bigquery.Avro
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	// Default for import jobs is to append data to a table.  WriteTruncate
	// specifies that existing data should instead be replaced/overwritten.
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;

// Sample to overwrite the BigQuery table data by loading a AVRO file from GCS
public class LoadAvroFromGCSTruncate {

  public static void runLoadAvroFromGCSTruncate() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro";
    loadAvroFromGCSTruncate(datasetName, tableName, sourceUri);
  }

  public static void loadAvroFromGCSTruncate(
      String datasetName, String tableName, String sourceUri) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(datasetName, tableName);
      LoadJobConfiguration loadConfig =
          LoadJobConfiguration.newBuilder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.avro())
              // Set the write disposition to overwrite existing table data
              .setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE)
              .build();

      // Load data from a GCS Avro file into the table
      Job job = bigquery.create(JobInfo.of(loadConfig));
      // Blocks until this load table job completes its execution, either failing or succeeding.
      job = job.waitFor();
      if (job.isDone()) {
        System.out.println("Table is successfully overwritten by AVRO file loaded from GCS");
      } else {
        System.out.println(
            "BigQuery was unable to load into the table due to an error:"
                + job.getStatus().getError());
      }
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Column not added during load append \n" + e.toString());
    }
  }
}

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Avro file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.avro
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.avro';

async function loadTableGCSAvroTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'us_states';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const jobConfigurationLoad = {
    load: {
      sourceFormat: 'AVRO',
      writeDisposition: 'WRITE_TRUNCATE',
    },
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), jobConfigurationLoad);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import io

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.AVRO,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

加载 Hive 分区的 Avro 数据

BigQuery 支持加载存储在 Cloud Storage 中的 Hive 分区 Avro 数据,并将 Hive 分区列作为目标 BigQuery 代管表中的列进行填充。如需了解详情,请参阅从 Cloud Storage 加载外部分区数据

Avro 转换

BigQuery 将 Avro 数据类型转化为以下 BigQuery 数据类型:

原始类型

不含 logicalType 特性的 Avro 数据类型 BigQuery 数据类型 备注
null BigQuery 会忽略这些值
布尔值 BOOLEAN
int INTEGER
long INTEGER
float FLOAT
double FLOAT
字节 BYTES
字符串 STRING 仅限 UTF-8

逻辑类型

默认情况下,BigQuery 会忽略大多数类型的 logicalType 特性,并改为使用基础 Avro 类型。要将 Avro 逻辑类型转换为相应的 BigQuery 数据类型,请使用 bq 命令行工具将 --use_avro_logical_types 标志设置为 true;或者在调用 jobs.insert 方法创建加载作业时,在作业资源中设置 useAvroLogicalTypes 属性。

下表显示 Avro 逻辑类型到 BigQuery 数据类型的转换。

Avro 逻辑类型 BigQuery 数据类型:已停用逻辑类型 BigQuery 数据类型:已启用逻辑类型
日期 INTEGER DATE
time-millis INTEGER TIME
time-micros INTEGER(转化自 LONG) TIME
timestamp-millis INTEGER(转化自 LONG) TIMESTAMP
timestamp-micros INTEGER(转化自 LONG) TIMESTAMP
local-timestamp-millis INTEGER(转化自 LONG) DATETIME
local-timestamp-micros INTEGER(转化自 LONG) DATETIME
时长 BYTES(由大小为 12 的 fixed 类型转换而来) BYTES(由大小为 12 的 fixed 类型转换而来)
decimal NUMERIC、BIGNUMERIC 或 STRING(请参阅 decimal 逻辑类型 NUMERIC、BIGNUMERIC 或 STRING(请参阅 decimal 逻辑类型

如需详细了解 Avro 数据类型,请参阅 Apache Avro™ 1.8.2 规范

日期逻辑类型

在要加载的任何 Avro 文件中,您必须按以下格式指定日期逻辑类型:

{
       "type": {"logicalType": "date", "type": "int"},
       "name": "date_field"
}

decimal 逻辑类型

Decimal 逻辑类型可以转换为 NUMERICBIGNUMERICSTRING 类型。转换后的类型取决于 decimal 逻辑类型的精度和比例参数以及指定的 decimal 目标类型。按如下方式指定小数目标类型:

为了实现向后兼容性,如果未指定小数目标类型,您可以将包含 decimal 逻辑类型的 bytes 列加载到现有 ID 的 BYTES 列中表。在这种情况下,系统会忽略 Avro 文件中的列的 decimal 逻辑类型。此转化模式已弃用,将来可能会移除。

如需详细了解 Avro decimal 逻辑类型,请参阅 Apache Avro™ 1.8.2 规范

时间逻辑类型

在要加载的任何 Avro 文件中,您必须按以下任一格式指定时间逻辑类型。

毫秒级精度:

{
       "type": {"logicalType": "time-millis", "type": "int"},
       "name": "time_millis_field"
}

微秒精度:

{
       "type": {"logicalType": "time-micros", "type": "int"},
       "name": "time_micros_field"
}

时间戳逻辑类型

在要加载的任何 Avro 文件中,您必须按以下任一格式指定时间戳逻辑类型。

毫秒级精度:

{
       "type": {"logicalType": "timestamp-millis", "type": "long"},
       "name": "timestamp_millis_field"
}

微秒精度:

{
       "type": {"logicalType": "timestamp-micros", "type": "long"},
       "name": "timestamp_micros_field"
}

本地时间戳逻辑类型

在要加载的任何 Avro 文件中,您必须按以下格式之一指定本地时间戳逻辑类型。

毫秒级精度:

{
       "type": {"logicalType": "local-timestamp-millis", "type": "long"},
       "name": "local_timestamp_millis_field"
}

微秒精度:

{
       "type": {"logicalType": "local-timestamp-micros", "type": "long"},
       "name": "local_timestamp_micros_field"
}

复合类型

Avro 数据类型 BigQuery 数据类型 备注
记录 RECORD
  • 忽略别名
  • 文档转换为字段描述
  • 默认值在读取时设置
  • 忽略顺序
  • 删除递归字段(仅保留递归字段的第一级嵌套)
枚举 STRING
  • 字符串是枚举的符号值
  • 忽略别名
  • 文档转换为字段描述
数组 重复字段 不支持数组的数组。忽略仅包含 NULL 类型的数组。
map<T> RECORD BigQuery 将 Avro map<T> 字段转换为包含两个字段(键和值)的重复 RECORD。BigQuery 将键存储为 STRING,并将值转换为 BigQuery 中相应的数据类型。
union
  • 可以为 Null 的字段
  • 包含可以为 Null 的字段列表的 RECORD
  • 当 union 只有一个非 null 类型时,它会转换为可以为 Null 的字段。
  • 否则,它将转换为包含可以为 Null 的字段列表的 RECORD。在读取时只会设置其中一个字段。
固定 BYTES
  • 忽略别名
  • 忽略大小

限制

  • BigQuery 不支持嵌套数组格式。使用此格式的 Avro 文件必须先进行转换,然后才能导入。
  • 在 Avro 文件中,名称和全名的命名空间只能包含字母数字字符和下划线字符 _。以下正则表达式显示了允许的字符:[A-Za-z_][A-Za-z0-9_]*

如需了解详情,请参阅 BigQuery 加载作业限制