使用旧版流式传输 API

本文档介绍如何使用旧式 tabledata.insertAll 方法将数据流式传输到 BigQuery 中。

对于新项目,我们建议使用 BigQuery Storage Write API,而不是使用 tabledata.insertAll 方法。Storage Write API 价格更低且功能更强大,包括“正好一次”传送语义。tabledata.insertAll 方法仍然完全受支持。

准备工作

  1. 确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。如需详细了解模板表,请参阅使用模板表自动创建表

  2. 查看流式传输数据的配额政策

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 不支持通过免费层级流式传输。如果您在未启用结算功能的情况下尝试使用流式传输,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.

  5. 授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色。

所需权限

如需将数据流式传输到 BigQuery 中,您需要拥有以下 IAM 权限:

  • bigquery.tables.updateData(可让您在表中插入数据)
  • bigquery.tables.get(可让您获取表元数据)
  • bigquery.datasets.get(可让您获取数据集元数据)
  • bigquery.tables.create(如果您使用模板表自动创建表,则为必需)

以下预定义 IAM 角色都包含将数据流式传输到 BigQuery 中所需的权限:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

将数据流式插入到 BigQuery

C#

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

插入行时无需填充 insertID 字段。 以下示例展示了如何避免在流式插入数据时为每行发送 insertID

Python

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

发送日期/时间数据

对于数据/时间字段,请按如下所示在 tabledata.insertAll 方法中设置数据的格式:

类型 格式
DATE 格式为 "YYYY-MM-DD" 的字符串
DATETIME 格式为 "YYYY-MM-DD [HH:MM:SS]" 的字符串
TIME 格式为 "HH:MM:SS" 的字符串
TIMESTAMP 从 1970-01-01(Unix 纪元)开始计算的秒数,或格式为 "YYYY-MM-DD HH:MM[:SS]" 的字符串

检查数据可用性

在首次将数据流式插入表后的几秒钟内,便可使用流式插入的数据进行实时分析。在极少数情况下(如服务中断),流式缓冲区中的数据可能暂时不可用。当数据不可用时,查询仍可成功运行,只是会跳过仍位于流式缓冲区中的某些数据。这些查询将在 bigquery.jobs.getQueryResultserrors 字段中、对 bigquery.jobs.query 的响应中或 bigquery.jobs.getstatus.errors 字段中包含一个警告。

您最多可能需要等待 90 分钟,才能对数据执行复制操作。此外,在将数据流式插入到分区表时,流式缓冲区中的数据对应的 _PARTITIONTIME 伪列值为 NULL。如需了解数据是否可用于复制,请检查 tables.get 响应中是否存在名为 streamingBuffer 的部分。如果该部分不存在,则数据应可用于复制,且对应的 _PARTITIONTIME 伪列应具有非 NULL 值。此外,您还可利用 streamingBuffer.oldestEntryTime 字段来识别流式缓冲区中记录的存在时间。

尽力去重功能

当您为插入的行提供 insertId 时,BigQuery 使用此 ID 支持尽力去重功能(最长可达一分钟)。也就是说,如果您尝试在此时间段内多次将具有相同 insertId 的同一行流式插入相同的表,则 BigQuery 可能会删除该行多次出现的重复数据,让数据仅出现一次。

系统认为具有相同 insertId 的行是相同的行。如果两行具有相同的 insertId,BigQuery 保留哪一行是不确定的。

去重功能通常用于分布式系统中由于错误(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)而无法确定流式插入的状态时的重试场景。如果重试插入操作,请对同一组行使用相同的 insertId,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题

BigQuery 提供的是尽力去重功能,不应依赖它作为您的数据中没有重复项的保证机制。此外,BigQuery 随时都有可能降低尽力去重功能的质量,以确保提供更高的数据可靠性和可用性。

如果您有严格的数据去重要求,可选择支持事务Google Cloud Datastore 服务。

停用尽力去重功能

对于每一插入行,您可以通过不填充其 insertId 字段,停用尽力去重功能。这是插入数据的推荐方法。

Apache Beam 和 Dataflow

如需在使用 Java 版 Apache Beam 的 BigQuery I/O 连接器时停用尽力去重功能,请使用 ignoreInsertIds() 方法

手动移除重复项

您可以通过下列手动过程,确保在流式插入完成后没有重复的行。

  1. insertId 作为列添加到表架构中,并在每行的数据中加入 insertId 值。
  2. 在流式插入停止后,执行下列查询以检查重复项:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    如果结果大于 1,则表示存在重复项。
  3. 要移除重复项,请执行以下查询。应指定目标表、允许大型结果并停用结果扁平化功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

有关重复项移除查询的注意事项:

  • 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置 WRITE_TRUNCATE 来以源表为目标表。
  • 重复项移除查询会在表架构的末尾添加一个值为 1row_number 列。该查询使用标准 SQL 中的 SELECT * EXCEPT 语句将 row_number 列从目标表中排除。#standardSQL 前缀用于为此查询启用标准 SQL。或者,您也可按特定列名称进行选择以忽略此列。
  • 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。

将数据流式插入到时间分区表

将数据流式插入到时间分区表时,每个分区都有一个流式缓冲区。如果您将 writeDisposition 属性设置为 WRITE_TRUNCATE,当执行会覆盖分区的加载、查询或复制作业时,系统会保留流式缓冲区。如果您想移除流式缓冲区,请对该分区调用 tables.get 以验证流式缓冲区是否为空。

提取时间分区

流式传输到注入时间分区表时,BigQuery 会根据当前世界协调时间 (UTC) 推断目标分区。

新到达的数据会临时放置在 __UNPARTITIONED__ 分区中,同时存储在流式缓冲区中。当未分区数据累积到足够多时,BigQuery 会将数据进行分区并放置到正确的分区中。查询可通过使用某一伪列([_PARTITIONTIME] 或 [_PARTITIONDATE],具体取决于您的首选数据类型)过滤掉 __UNPARTITIONED__ 分区中的 NULL 值,从查询中排除流式缓冲区中的数据。

如果您要将数据流式插入每日分区表,则可以通过在 insertAll 请求中添加分区修饰器来替换推断出的日期。在 tableId 参数中添加修饰器。例如,您可以使用以下分区修饰器将数据流式插入到 table1 表中与 2021-03-01 对应的分区:

table1$20210301

在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。如需将数据写入到日期在这些允许范围之外的分区,请改为使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作

只有每日分区表支持使用分区修饰器进行流式插入。每小时、每月或每年分区表不支持此操作。

如需进行测试,您可以使用 bq 命令行工具 bq insert CLI 命令。例如,以下命令会将一行数据流式传输到名为 mydataset.mytable 的分区表中日期为 2017 年 1 月 1 日 ($20170101) 的一个分区内:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

时间单位列分区

您可以将数据流式插入到按 DATEDATETIMETIMESTAMP 列分区且列值介于过去 5 年到未来 1 年之间的表。不在此范围内的数据会被拒绝。

流式插入数据时,数据最初放在 __UNPARTITIONED__ 分区中。当未分区的数据累积到足够多时,BigQuery 会自动对数据进行重新分区,并将其放入相应的分区中。

使用模板表自动创建表

模板表提供了一种将逻辑表拆分为多个较小表的机制,以创建较小的数据集(例如,按用户 ID)。模板表有许多限制,如下所述。建议使用分区表聚簇表来实现此行为。

如需通过 BigQuery API 使用模板表,请在 insertAll 请求中添加 templateSuffix 参数。对于 bq 命令行工具,请将 template_suffix 标志添加到 insert 命令。如果 BigQuery 检测到 templateSuffix 参数或 template_suffix 标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:

<targeted_table_name> + <templateSuffix>

使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。

通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更长时间。

更改模板表架构

如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式缓冲区,否则之前生成的表不受影响。

对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会获得更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新的数据流式插入到这些表中。

更改模板表架构后,请等待更改传播完成,然后再尝试插入新的数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。

如果要更改已生成表的架构,则必须等到通过模板表的流式插入停止,并且 tables.get() 响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),才能更改架构。

分区表聚簇表不会遇到上述限制,建议您采用这种机制。

模板表详情

模板后缀值
templateSuffix(或 --template_suffix)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。
配额
模板表与其他表一样受到类似的流式插入配额限制。
生存时间
生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制生成的表。
去重功能
去重仅会发生在针对目标表的相同引用之间。 例如,如果同时使用模板表和常规 insertAll 命令将数据流式插入到生成的表,则在通过模板表和常规 insertAll 命令插入的行之间不会发生去重操作。
视图
模板表和生成的表不应为视图。

排查流式插入问题

要了解如何在流式插入期间排查错误,请参阅“排查错误”页面上的排查流式插入问题