将数据流式插入 BigQuery

除了使用作业将数据加载到 BigQuery 外,您还可以使用 tabledata.insertAll 方法,以一次一条记录的方式将数据流式插入到 BigQuery 中。此方法可让您不必等待加载作业,即可查询数据。

本文档讨论了在选择此方法之前要权衡的几项重要因素,包括流式插入配额、数据可用性和数据一致性。

准备工作

  1. 确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。要详细了解模板表,请参阅使用模板表自动创建表

  2. 查看流式插入数据的配额政策

  3. 确保您的 Google Cloud Platform 项目已启用结算功能。

    了解如何启用结算功能

    免费层级不支持流式插入功能。如果您在未启用结算功能的情况下尝试使用流式插入,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.

检查数据可用性

在首次将数据流式插入表后的几秒钟内,便可使用流式插入的数据进行实时分析。在极少数情况下(如服务中断),流式缓冲区中的数据可能暂时不可用。当数据不可用时,查询仍可成功运行,只是会跳过仍位于流式缓冲区中的某些数据。这些查询将在 bigquery.jobs.getQueryResultserrors 字段中、对 bigquery.jobs.query 的响应中或 bigquery.jobs.getstatus.errors 字段中包含一个警告。

您最多可能需要等待 90 分钟,才能对数据执行复制和导出操作。此外,在将数据流式插入到分区表时,流式缓冲区中的数据对应的 _PARTITIONTIME 伪列值为 NULL。要了解数据是否可用于复制和导出,请检查 tables.get 响应中是否存在名为 streamingBuffer 的部分。如果该部分不存在,则数据应可用于复制或导出,且对应的 _PARTITIONTIME 伪列应具有非 NULL 值。此外,还可利用 streamingBuffer.oldestEntryTime 字段来识别流式缓冲区中记录的存在时间。

确保数据一致性

为帮助确保数据一致性,可为每个插入的行提供 insertId。BigQuery 至少会在一分钟内记住此 ID。如果您尝试在此时间段内流式插入同一组行,并且已设置 insertId 属性,则 BigQuery 将使用 insertId 属性尽可能地删除重复的数据。

由于在某些错误情况下(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)无法确定流式插入的状态,您可能必须重试插入。如果要重试插入,请对同一组行使用相同的 insertId,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题

在 Google 数据中心意外断开连接的极少数情况下,可能无法自动删除重复的数据。

如果您有更高的数据要求,可选择支持事务Google Cloud Datastore 服务。

停用尽力去重功能

对于插入的每一行,您可以通过不填充其 insertId 字段,停用尽力去重功能。如果不填充 insertId,您会获得更高的针对 US 区域的流式提取配额。如需了解详情,请参阅配额和限制页面。

跨数据位置流式插入数据

您可以将数据流式插入到位于美国和位于欧盟区域的数据集。当 BigQuery 处理 insertAll 请求时,数据可能会流经数据集所在位置以外的机器。如果您要从数据集所在位置以外的某个位置流式插入数据,可能会增加发生延迟和错误的几率。

流式插入到按提取时间分区的表

您可以使用 insertAll 请求将各行数据流式插入到分区表。默认情况下,系统会根据当前日期(基于世界协调时间 (UTC))推断所插入数据对应的目标分区。

如果您要将数据流式插入到按提取时间分区的表,则可以通过在 insertAll 请求中添加分区修饰器来覆盖推断出的日期。例如,您可以使用以下分区修饰器将数据流式插入到表 mydataset.table 中与 2017-03-01 对应的分区:

mydataset.table$20170301

当新传入的数据到达流式缓冲区中时,将被暂时关联至 UNPARTITIONED 分区。因此,查询可通过使用某一伪列([_PARTITIONTIME] 或 [_PARTITIONDATE],具体取决于您的首选数据类型)过滤掉 UNPARTITIONED 分区中的 NULL 值,从查询中排除流式缓冲区中的数据。

在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。要将数据写入到日期在这些允许范围之外的分区,可使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作

流式插入到分区表

您可以将数据流式插入到按 DATETIMESTAMP 列分区且列值介于过去 1 年到未来 6 个月之间的表。不在此范围内的数据会被拒绝。

流式插入数据时,介于过去 7 天到未来 3 天之间的数据会先被放入流式缓冲区中,然后再提取到相应分区。超出此时段的数据(但日期仍然介于过去 1 年到未来 6 个月之间)会先被放入流式缓冲区中,然后再提取到 UNPARTITIONED 分区。当未分区数据累积到足够多时,会将这些数据加载到相应的分区。

使用模板表自动创建表

将数据流式插入到 BigQuery 的常见使用模式是将逻辑表拆分为许多较小的表,以创建较小的数据集(例如按用户 ID)。要按日期创建较小的数据集,请使用分区表。如果不想按日期创建较小的表,请使用模板表,然后 BigQuery 会为您创建这些表

要通过 BigQuery API 使用模板表,请在 insertAll 请求中添加 templateSuffix 参数。对于 bq 命令行工具,请将 template_suffix 标志添加到 insert 命令。如果 BigQuery 检测到 templateSuffix 参数或 template_suffix 标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:

<targeted_table_name> + <templateSuffix>

使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。模板还让架构更新变得更为轻松,因为您只需更新模板表即可。

通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更多时间。

更改模板表架构

如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式缓冲区,否则之前生成的表不受影响。

对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会获得更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新的数据流式插入到这些表中。

更改模板表架构后,请等待更改传播完成,然后再尝试插入新的数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。

如果要更改已生成的表的架构,请等到通过模板表的流式插入停止,并且 tables.get() 响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),否则请勿更改架构。

模板表详情

模板后缀值
templateSuffix(或 --template_suffix)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。
配额
无论是基于模板还是手动创建的表,都适用同一配额。
生存时间
生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制或导出生成的表。
重复信息删除
仅会在针对目标表的相同引用之间发生重复信息删除。例如,如果同时使用模板表和常规 insertAll 命令将数据流式插入到生成的表,则在通过模板表和常规 insertAll 命令插入的行之间不会发生重复信息删除。
视图
模板表和生成的表不应为视图。

使用情况示例

大量事件日志

如果您有实时收集大量数据的应用,则流式插入会是一个不错的选择。通常,这类应用具有以下标准:

  • 非事务性。 大量、不断附加行。应用可容忍偶尔出现重复项或数据暂时不可用。
  • 汇总分析。 执行查询通常是为了进行趋势分析,而不会选择单条或小范围记录。

事件跟踪就属于一种大量事件日志的情况。假设您有一个跟踪事件的移动应用。您的应用或移动服务器可以独立记录用户交互或系统错误,并将其流式插入到 BigQuery 中。您可分析此数据以判断整体趋势(例如高度交互或问题方面),并实时监控错误情况。

手动移除重复项

您可以通过下列手动过程,确保在流式插入完成后没有重复的行。

  1. insertId 作为列添加到表架构中,并在每行的数据中包含 insertId 值。
  2. 在流式插入停止后,执行下列查询以检查重复项:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    如果结果大于 1,则表示存在重复项。
  3. 要移除重复项,请执行以下查询。应指定目标表、允许大型结果并停用结果扁平化功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

有关重复项移除查询的注意事项:

  • 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置 WRITE_TRUNCATE 来以源表为目标表。
  • 重复项移除查询会在表架构的末尾添加值为 1row_number 列。该查询使用标准 SQL 中的 SELECT * EXCEPT 语句从目标表中排除 row_number 列。#standardSQL 前缀用于为此查询启用标准 SQL。或者,您也可按特定列名称进行选择以忽略此列。
  • 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。

排查流式插入问题

要了解如何在流式插入期间排查错误,请参阅“排查错误”页面上的排查流式插入问题

返回页首

流式插入示例

C#

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Inserter()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow(rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder.
            // You can also supply optional unique row keys to support de-duplication scenarios.
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

在尝试此示例之前,请先按照《BigQuery 快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

返回页首

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
需要帮助?请访问我们的支持页面