使用旧版流式传输 API

本文档介绍如何使用旧式 tabledata.insertAll 方法将数据流式传输到 BigQuery 中。

对于新项目,我们建议使用 BigQuery Storage Write API,而不是使用 tabledata.insertAll 方法。Storage Write API 价格更低且功能更强大,包括“正好一次”传送语义。如果您要将现有项目从 tabledata.insertAll 方法迁移到 Storage Write API,我们建议您选择默认流tabledata.insertAll 方法仍然完全受支持。

准备工作

  1. 确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。如需详细了解模板表,请参阅使用模板表自动创建表

  2. 查看流式传输数据的配额政策

  3. Make sure that billing is enabled for your Google Cloud project.

  4. 不支持通过免费层级流式传输。如果您在未启用结算功能的情况下尝试使用流式传输,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.

  5. 授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色。

所需权限

如需将数据流式传输到 BigQuery 中,您需要拥有以下 IAM 权限:

  • bigquery.tables.updateData(可让您在表中插入数据)
  • bigquery.tables.get(可让您获取表元数据)
  • bigquery.datasets.get(可让您获取数据集元数据)
  • bigquery.tables.create(如果您使用模板表自动创建表,则为必需)

以下预定义 IAM 角色都包含将数据流式传输到 BigQuery 中所需的权限:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

将数据流式插入到 BigQuery

C#

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

插入行时无需填充 insertID 字段。以下示例展示了如何避免在流式插入数据时为每行发送 insertID

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

发送日期和时间数据

对于日期和时间字段,请按如下所示在 tabledata.insertAll 方法中设置数据的格式:

类型 格式
DATE 格式为 "YYYY-MM-DD" 的字符串
DATETIME 格式为 "YYYY-MM-DD [HH:MM:SS]" 的字符串
TIME 格式为 "HH:MM:SS" 的字符串
TIMESTAMP 从 1970-01-01(Unix 纪元)开始计算的秒数,或格式为 "YYYY-MM-DD HH:MM[:SS]" 的字符串

发送范围数据

对于类型为 RANGE<T> 的字段,请将 tabledata.insertAll 方法中的数据格式设置为包含两个字段(startend)的 JSON 对象。startend 字段的 null 值表示无界限边界。这些字段必须采用类型为 T 的相同受支持 JSON 格式,其中 T 可以是 DATEDATETIMETIMESTAMP 中的一个。

在以下示例中,f_range_date 字段表示表中的 RANGE<DATE> 列。使用 tabledata.insertAll API 将一行插入此列。

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

流式插入的数据的可用性

在 BigQuery 成功确认 tabledata.insertAll 请求后,您便可立即使用 GoogleSQL 查询对数据进行实时分析。

最近流式插入到注入时间分区表的行的 _PARTITIONTIME 伪列临时具有 NULL 值。对于此类行,BigQuery 通常会在几分钟内在后台分配 PARTITIONTIME 列的最终非 NULL 值。在极少数情况下,此过程最长可能需要 90 分钟。

某些在最近流式插入的行可能无法在几分钟内用于表复制。在极少数情况下,此过程最长可能需要 90 分钟。 如需了解数据是否可用于表复制,请检查 tables.get 响应中是否存在名为 streamingBuffer 的部分。如果 streamingBuffer 部分不存在,则数据可供复制。您还可以使用 streamingBuffer.oldestEntryTime 字段标识流式传输缓冲区中记录的存在时间。

尽力去重功能

当您为插入的行提供 insertId 时,BigQuery 使用此 ID 支持尽力去重功能(最长可达一分钟)。也就是说,如果您在此时间段内多次将具有相同 insertId 的同一行流式插入到同一表中,则 BigQuery 可能会删除该行多次出现的重复数据,让数据仅出现一次。

系统认为具有相同 insertId 的行是相同的行。如果两行具有相同的 insertId,BigQuery 保留哪一行是不确定的。

去重功能通常用于分布式系统中由于错误(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)而无法确定流式插入的状态时的重试场景。如果重试插入操作,请对同一组行使用相同的 insertId,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题

BigQuery 提供的是尽力去重功能,不应依赖它作为您的数据中没有重复项的保证机制。此外,BigQuery 随时都有可能降低尽力去重功能的质量,以确保提供更高的数据可靠性和可用性。

如果您有严格的数据去重要求,可选择支持事务Google Cloud Datastore 服务。

停用尽力去重功能

对于每一插入行,您可以通过不填充其 insertId 字段,停用尽力去重功能。这是插入数据的推荐方法。

Apache Beam 和 Dataflow

如需在使用 Java 版 Apache Beam 的 BigQuery I/O 连接器时停用尽力去重功能,请使用 ignoreInsertIds() 方法

手动移除重复项

为确保在完成流式插入后不存在重复行,请使用以下手动过程:

  1. insertId 作为列添加到表架构中,并在每行的数据中加入 insertId 值。
  2. 在流式插入停止后,执行下列查询以检查重复项:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    如果结果大于 1,则表示存在重复项。
  3. 如需移除重复项,请运行以下查询。指定目标表、允许大型结果并停用结果扁平化功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

有关重复项移除查询的注意事项:

  • 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置 WRITE_TRUNCATE 来以源表为目标表。
  • 重复项移除查询会在表架构的末尾添加一个值为 1row_number 列。该查询使用 GoogleSQL 中的 SELECT * EXCEPT 语句将 row_number 列从目标表中排除。#standardSQL 前缀用于为此查询启用 GoogleSQL。或者,您也可按特定列名称进行选择以忽略此列。
  • 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,系统将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。

将数据流式插入到时间分区表

将数据流式插入到时间分区表时,每个分区都有一个流式缓冲区。如果您将 writeDisposition 属性设置为 WRITE_TRUNCATE,当执行会覆盖分区的加载、查询或复制作业时,系统会保留流式缓冲区。如果您想移除流式缓冲区,请对该分区调用 tables.get 以验证流式缓冲区是否为空。

提取时间分区

流式传输到注入时间分区表时,BigQuery 会根据当前世界协调时间 (UTC) 推断目标分区。

新到达的数据会临时放置在 __UNPARTITIONED__ 分区中,同时存储在流式缓冲区中。当未分区数据累积到足够多时,BigQuery 会将数据进行分区并放置到正确的分区中。但是,对于数据移出 __UNPARTITIONED__ 分区所需的时间,没有服务等级协议 (SLA)。查询可通过使用某一伪列([_PARTITIONTIME] 或 [_PARTITIONDATE],具体取决于您的首选数据类型)过滤掉 __UNPARTITIONED__ 分区中的 NULL 值,从查询中排除流式缓冲区中的数据。

如果您要将数据流式插入每日分区表,则可以通过在 insertAll 请求中添加分区修饰器来替换推断出的日期。在 tableId 参数中添加修饰器。例如,您可以使用以下分区修饰器将数据流式插入到 table1 表中与 2021-03-01 对应的分区:

table1$20210301

在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。如需将数据写入到日期在这些允许范围之外的分区,请改为使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作

只有每日分区表支持使用分区修饰器进行流式插入。每小时、每月或每年分区表不支持此操作。

如需进行测试,您可以使用 bq 命令行工具 bq insert CLI 命令。例如,以下命令会将一行数据流式传输到名为 mydataset.mytable 的分区表中日期为 2017 年 1 月 1 日 ($20170101) 的一个分区内:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

时间单位列分区

您可以将数据流式插入到按 DATEDATETIMETIMESTAMP 列分区且列值介于过去 5 年到未来 1 年之间的表。不在此范围内的数据会被拒绝。

流式插入数据时,数据最初放在 __UNPARTITIONED__ 分区中。当未分区的数据累积到足够多时,BigQuery 会自动对数据进行重新分区,并将其放入相应的分区中。 但是,对于数据移出 __UNPARTITIONED__ 分区所需的时间,没有服务等级协议 (SLA)。

  • 注意:每日分区的处理方式与每小时、每月和每年分区不同。只有超出日期范围(过去 7 天到未来 3 天)的数据才会提取到 UNPARTITIONED 分区,等待重新分区。另一方面,对于每小时分区表,数据始终会被提取到 UNPARTITIONED 分区,然后进行重新分区。

使用模板表自动创建表

模板表提供了一种将逻辑表拆分为多个较小表的机制,以创建较小的数据集(例如,按用户 ID)。模板表有许多限制,如下所述。建议使用分区表聚簇表来实现此行为。

如需通过 BigQuery API 使用模板表,请在 insertAll 请求中添加 templateSuffix 参数。对于 bq 命令行工具,请在您的 insert 命令中添加 template_suffix 标志。如果 BigQuery 检测到 templateSuffix 参数或 template_suffix 标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:

<targeted_table_name> + <templateSuffix>

使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。

通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更多时间。

更改模板表架构

如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式传输缓冲区,否则之前生成的表不受影响。

对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会进行更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新数据流式插入到这些表中。

更改模板表架构后,请等待更改传播完成,然后再尝试插入新数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。

如果要更改已生成的表的架构,则直到通过模板表的流式插入停止,并且 tables.get() 响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),才能更改架构。

分区表聚簇表不会遇到上述限制,建议您采用这种机制。

模板表详情

模板后缀值
templateSuffix(或 --template_suffix)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。
配额

模板表受到流式插入配额的限制。您的项目每秒最多可以发出 10 个包含模板表的表(类似于 tables.insert API)。此配额仅适用于正在创建的表,而不适用于要修改的表。

如果您的应用需要每秒创建 10 个以上的表,我们建议您使用聚簇表。例如,您可以将高基数表 ID 放入单个聚簇表的键列中。

生存时间

生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制生成的表。

去重功能

仅会在针对目标表的相同引用之间发生重复信息删除。 例如,如果同时使用模板表和常规 insertAll 命令将数据流式插入到生成的表,则在通过模板表和常规 insertAll 命令插入的行之间不会发生去重操作。

视图

模板表和生成的表不应为视图。

排查流式插入问题

以下部分讨论如何排查在使用旧版流式插入 API 将数据流式插入到 BigQuery 时发生的错误。如需详细了解如何解决流式插入的配额错误,请参阅流式插入配额错误

失败 HTTP 响应代码

如果收到失败的 HTTP 响应代码(如网络连接错误),则无法确定流式插入是否成功。如果尝试重新发送请求,则可能导致最终表中出现重复的行。为了避免表中出现重复的内容,请在发送请求时设置 insertId 属性。BigQuery 会使用 insertId 属性执行去重操作。

如果您收到权限错误、表名称无效错误或超出配额错误,则系统不会插入任何行,并且整个请求都会失败。

成功 HTTP 响应代码

即使您收到成功 HTTP 响应代码,也需要检查响应的 insertErrors 属性才能确定是否成功插入行,因为 BigQuery 可能只是成功插入了部分行。 您可能会遇到以下情况之一:

  • 已成功插入所有行。如果 insertErrors 属性是空列表,则表示所有行均已成功插入。
  • 已成功插入一些行。insertErrors 属性中指示的行并未插入,而其他所有行则已成功插入(除非任何行中存在架构不匹配的情况)。errors 属性详细说明了每个未成功插入行的失败原因。index 属性指示请求中与错误对应的行索引(从 0 开始)。
  • 未成功插入任何行。如果 BigQuery 在请求的个别行上遇到架构不匹配的情况,则系统不会插入任何行,并会针对每一行(即使是架构匹配的行)返回一个 insertErrors 条目。对于架构匹配的行,其所对应错误的 reason 属性将设置为 stopped,因此您可以按原样重新发送这些行。而对于插入失败的行,其会包含有关架构不匹配情况的详细信息。如需了解每种 BigQuery 数据类型支持的协议缓冲区类型,请参阅数据类型转换

流式插入的元数据错误

由于 BigQuery 的流式插入 API 可以实现高插入速率,因此在与流式插入系统进行交互时,对底层表元数据展示的修改最终将保持一致。大多数时候,元数据更改会在几分钟内完成传送,但在此期间,API 响应可能会反映表的不一致状态。

部分情况包括:

  • 架构更改。针对最近接收了流式插入内容的表修改架构时,响应可能会指出架构不匹配错误,因为流式插入系统可能不会立即反映架构更改。
  • 表创建/删除。如果将数据流式插入到不存在的表,则系统将返回 notFound 响应的变体。后续流式插入作业可能无法立即识别在响应中创建的表。同样地,删除或重新创建表可能造成在一段时间内流式插入实际上传输到旧表。流式插入可能不会出现在新表中。
  • 表截断。截断表的数据(通过使用 WRITE_TRUNCATE 的 writeDisposition 的查询作业)同样可能导致在一致性期间的后续插入丢失。

数据丢失/不可用

流式插入临时驻留在写入优化存储空间中,该存储空间具有不同于代管式存储空间的可用性特征。BigQuery 中的某些操作不与写入优化存储空间交互,例如表复制作业和 tabledata.list 等 API 方法。最近流式插入的数据将不会出现在目标表或输出中。