将数据流式插入到 BigQuery

本页面介绍如何使用 tabledata.insertAll 方法将数据流式插入到 BigQuery。流式插入是批量加载来将数据提取到 BigQuery 的替代方案。使用流式插入时,您可以一次插入一条记录,也可以小批量插入(通常不超过 500 条)。

实际使用示例

如果您有实时收集大量数据的应用,则流式插入会是一个不错的选择。通常,这类应用具有以下标准:

  • 非事务性。大量、不断附加行。应用可容忍偶尔出现重复项或数据暂时不可用。
  • 汇总分析。执行查询通常是为了进行趋势分析,而不会选择单条或小范围记录。

事件跟踪就属于一种大量事件日志的情况。假设您有一个跟踪事件的移动应用。您的应用或移动服务器可以独立记录用户交互或系统错误,并将其流式插入到 BigQuery 中。您可分析此数据以判断整体趋势(例如高度交互或问题方面),并实时监控错误情况。

准备工作

  1. 确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。如需详细了解模板表,请参阅使用模板表自动创建表

  2. 查看流式插入数据的配额政策

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

免费层级不支持流式插入功能。 如果您在未启用结算功能的情况下尝试使用流式插入,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.

所需权限

如需将数据流式插入到 BigQuery,您至少必须具有以下权限:

  • 可向表中插入数据的 bigquery.tables.updateData 权限
  • 可获取表元数据的 bigquery.tables.get 权限
  • 可获取数据集元数据的 bigquery.datasets.get 权限

如果使用模板表自动创建表,则还必须拥有 bigquery.tables.create 权限。

以下预定义的 Identity and Access Management (IAM) 角色可提供 bigquery.tables.updateDatabigquery.tables.create 权限:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

将数据流式插入到 BigQuery

C#

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

插入行时无需填充 insertID 字段。 以下示例展示了如何避免在流式插入数据时为每行发送 insertID

Python

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

检查数据可用性

在首次将数据流式插入表后的几秒钟内,便可使用流式插入的数据进行实时分析。在极少数情况下(如服务中断),流式缓冲区中的数据可能暂时不可用。当数据不可用时,查询仍可成功运行,只是会跳过仍位于流式缓冲区中的某些数据。这些查询将在 bigquery.jobs.getQueryResultserrors 字段中、对 bigquery.jobs.query 的响应中或 bigquery.jobs.getstatus.errors 字段中包含一个警告。

您最多可能需要等待 90 分钟,才能对数据执行复制操作。此外,在将数据流式插入到分区表时,流式缓冲区中的数据对应的 _PARTITIONTIME 伪列值为 NULL。如需了解数据是否可用于复制,请检查 tables.get 响应中是否存在名为 streamingBuffer 的部分。如果该部分不存在,则数据应可用于复制,且对应的 _PARTITIONTIME 伪列应具有非 NULL 值。此外,您还可利用 streamingBuffer.oldestEntryTime 字段来识别流式缓冲区中记录的存在时间。

尽力去重功能

当您为插入的行提供 insertId 时,BigQuery 使用此 ID 支持尽力去重功能(最长可达一分钟)。也就是说,如果您尝试在此时间段内多次将具有相同 insertId 的同一行流式插入相同的表,则 BigQuery 可能会删除该行多次出现的重复数据,让数据仅出现一次。

在某个分布式系统中,在某些错误情况下(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)无法确定流式插入的状态,这通常表示您需要重试。如果重试插入操作,请对同一组行使用相同的 insertId,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题

BigQuery 提供的是尽力去重功能,不应依赖它作为您的数据中没有重复项的保证机制。此外,BigQuery 随时都有可能降低尽力去重功能的质量,以确保提供更高的数据可靠性和可用性。

如果您有严格的数据去重要求,可选择支持事务Google Cloud Datastore 服务。

停用尽力去重功能

对于每一插入行,您可以通过不填充其 insertId 字段,停用尽力去重功能。如果不填充 insertId,您可以在某些地区获得更高的流式提取配额。这是获得更高的流式提取配额限制的推荐方法。如需了解详情,请参阅配额和限制

Apache Beam 和 Dataflow

如需在使用 Java 版 Apache Beam 的 BigQuery I/O 连接器时停用尽力去重功能,请使用 ignoreInsertIds() 方法

手动移除重复项

您可以通过下列手动过程,确保在流式插入完成后没有重复的行。

  1. insertId 作为列添加到表架构中,并在每行的数据中加入 insertId 值。
  2. 在流式插入停止后,执行下列查询以检查重复项:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    如果结果大于 1,则表示存在重复项。
  3. 要移除重复项,请执行以下查询。应指定目标表、允许大型结果并停用结果扁平化功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

有关重复项移除查询的注意事项:

  • 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置 WRITE_TRUNCATE 来以源表为目标表。
  • 重复项移除查询会在表架构的末尾添加一个值为 1row_number 列。该查询使用标准 SQL 中的 SELECT * EXCEPT 语句将 row_number 列从目标表中排除。#standardSQL 前缀用于为此查询启用标准 SQL。或者,您也可按特定列名称进行选择以忽略此列。
  • 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。

将数据流式插入到时间分区表

将数据流式插入到时间分区表时,每个分区都有一个流式缓冲区。如果您将 writeDisposition 属性设置为 WRITE_TRUNCATE,当执行会覆盖分区的加载、查询或复制作业时,系统会保留流式缓冲区。如果您想移除流式缓冲区,请对该分区调用 tables.get 以验证流式缓冲区是否为空。

提取时间分区

流式传输到注入时间分区表时,BigQuery 会根据当前世界协调时间 (UTC) 推断目标分区。

新到达的数据会临时放置在 __UNPARTITIONED__ 分区中,同时存储在流式缓冲区中。当未分区数据累积到足够多时,BigQuery 会将数据进行分区并放置到正确的分区中。查询可通过使用某一伪列([_PARTITIONTIME] 或 [_PARTITIONDATE],具体取决于您的首选数据类型)过滤掉 __UNPARTITIONED__ 分区中的 NULL 值,从查询中排除流式缓冲区中的数据。

如果您要将数据流式插入每日分区表,则可以通过在 insertAll 请求中添加分区修饰器来替换推断出的日期。在 tableId 参数中添加修饰器。例如,您可以使用以下分区修饰器将数据流式插入到 table1 表中与 2021-03-01 对应的分区:

table1$20210301

在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。如需将数据写入到日期在这些允许范围之外的分区,请改为使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作

只有每日分区表支持使用分区修饰器进行流式插入。每小时、每月或每年分区表不支持此操作。

如需进行测试,您可以使用 bq 命令行工具 bq insert CLI 命令。例如,以下命令会将一行数据流式传输到名为 mydataset.mytable 的分区表中日期为 2017 年 1 月 1 日 ($20170101) 的一个分区内:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

时间单位列分区

您可以将数据流式插入到按 DATEDATETIMETIMESTAMP 列分区且列值介于过去 5 年到未来 1 年之间的表。不在此范围内的数据会被拒绝。

流式插入数据时,数据最初放在 __UNPARTITIONED__ 分区中。当未分区的数据累积到足够多时,BigQuery 会自动对数据进行重新分区,并将其放入相应的分区中。

使用模板表自动创建表

模板表提供了一种将逻辑表拆分为多个较小表的机制,以创建较小的数据集(例如,按用户 ID)。模板表有许多限制,如下所述。建议使用分区表聚簇表来实现此行为。

如需通过 BigQuery API 使用模板表,请在 insertAll 请求中添加 templateSuffix 参数。对于 bq 命令行工具,请将 template_suffix 标志添加到 insert 命令。如果 BigQuery 检测到 templateSuffix 参数或 template_suffix 标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:

<targeted_table_name> + <templateSuffix>

使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。

通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更长时间。

更改模板表架构

如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式缓冲区,否则之前生成的表不受影响。

对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会获得更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新的数据流式插入到这些表中。

更改模板表架构后,请等待更改传播完成,然后再尝试插入新的数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。

如果要更改已生成表的架构,则必须等到通过模板表的流式插入停止,并且 tables.get() 响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),才能更改架构。

分区表聚簇表不会遇到上述限制,建议您采用这种机制。

模板表详情

模板后缀值
templateSuffix(或 --template_suffix)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。
配额
模板表与其他表一样受到类似的流式配额限制。此外,当您将数据流式传输到模板表时,停用尽力去重功能不会提供更高的配额。
生存时间
生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制生成的表。
去重功能
去重仅会发生在针对目标表的相同引用之间。 例如,如果同时使用模板表和常规 insertAll 命令将数据流式插入到生成的表,则在通过模板表和常规 insertAll 命令插入的行之间不会发生去重操作。
视图
模板表和生成的表不应为视图。

排查流式插入问题

要了解如何在流式插入期间排查错误,请参阅“排查错误”页面上的排查流式插入问题