将数据流式插入 BigQuery

您可以选择使用 tabledata.insertAll 方法,每次将一条数据记录流式插入 BigQuery,而不必使用作业将数据加载到 BigQuery 中。 这种方法可让您在查询数据时避免因运行加载作业而产生的延迟。

本文档讨论了在选择此方法之前要权衡的几项重要因素,包括流式插入配额、数据可用性和数据一致性。

准备工作

  1. 确保您对包含目标表的数据集具有写入权限。除非使用的是模板表,否则在开始向目标表写入数据之前该表就必须存在。如需详细了解模板表,请参阅使用模板表自动创建表

  2. 查看流式插入数据的配额政策

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

免费层级不支持流式插入功能。 如果您在未启用结算功能的情况下尝试使用流式插入,则会收到以下错误:BigQuery: Streaming insert is not allowed in the free tier.

检查数据可用性

在首次将数据流式插入表后的几秒钟内,便可使用流式插入的数据进行实时分析。在极少数情况下(如服务中断),流式缓冲区中的数据可能暂时不可用。当数据不可用时,查询仍可成功运行,只是会跳过仍位于流式缓冲区中的某些数据。这些查询将在 bigquery.jobs.getQueryResultserrors 字段中、对 bigquery.jobs.query 的响应中或 bigquery.jobs.getstatus.errors 字段中包含一个警告。

您最多可能需要等待 90 分钟,才能对数据执行复制操作。此外,在将数据流式插入到分区表时,流式缓冲区中的数据对应的 _PARTITIONTIME 伪列值为 NULL。如需了解数据是否可用于复制,请检查 tables.get 响应中是否存在名为 streamingBuffer 的部分。如果该部分不存在,则数据应可用于复制,且对应的 _PARTITIONTIME 伪列应具有非 NULL 值。此外,您还可利用 streamingBuffer.oldestEntryTime 字段来识别流式缓冲区中记录的存在时间。

尽力去重功能

当您为插入的行提供 insertId 时,BigQuery 使用此 ID 支持尽力去重功能(最长可达一分钟)。也就是说,如果您尝试在此时间段内多次将具有相同 insertId 的同一行流式插入相同的表,则 BigQuery 可能会删除该行多次出现的重复数据,让数据仅出现一次。

在某个分布式系统中,在某些错误情况下(例如系统和 BigQuery 之间发生网络错误或 BigQuery 中发生内部错误)无法确定流式插入的状态,这通常表示您需要重试。如果重试插入操作,请对同一组行使用相同的 insertId,让 BigQuery 能够尝试删除重复的数据。如需了解详情,请参阅排查流式插入问题

BigQuery 提供的是尽力去重功能,不应依赖它作为您的数据中没有重复项的保证机制。此外,BigQuery 随时都有可能降低尽力去重功能的质量,以确保提供更高的数据可靠性和可用性。

如果您有严格的数据去重要求,可选择支持事务Google Cloud Datastore 服务。

停用尽力去重功能

对于每一插入行,您可以通过不填充其 insertId 字段,停用尽力去重功能。如果不填充 insertId,您可以在某些地区获得更高的流式提取配额。这是获得更高的流式提取配额限制的推荐方法。如需了解详情,请参阅配额和限制

Apache Beam 和 Dataflow

如需在使用 Java 版 Apache Beam 的 BigQuery I/O 连接器时停用尽力去重功能,请使用 ignoreInsertIds() 方法

跨数据位置流式插入数据

您可以将数据同时流式插入到位于美国和欧盟区域的数据集。当 BigQuery 处理 insertAll 请求时,数据可能流经数据集所在位置以外的机器。如果您要从数据集所在位置以外的某个位置流式插入数据,可能会增加发生延迟和错误的几率。

流式插入到按提取时间分区的表

您可以使用 insertAll 请求将单行数据流式插入到分区表中。默认情况下,系统会根据当前日期(基于世界协调时间 (UTC))推断所插入数据对应的目标分区。

如果您要将数据流式插入到提取时间分区表,则可以通过在 insertAll 请求中添加分区修饰器来替换推断出的日期。例如,您可以使用以下分区修饰器将数据流式插入到 mydataset.table 表中与 2017-03-01 对应的分区:

mydataset.table$20170301

当新传入的数据到达流式缓冲区中时,将被暂时关联至 UNPARTITIONED 分区。因此,查询可通过使用某一伪列([_PARTITIONTIME] 或 [_PARTITIONDATE],具体取决于您的首选数据类型)过滤掉 UNPARTITIONED 分区中的 NULL 值,从查询中排除流式缓冲区中的数据。

在使用分区修饰器流式插入数据时,您可以将数据流式插入到日期介于当前日期(基于当前世界协调时间 (UTC))之前 31 天至之后 16 天之间的分区。如需将数据写入到日期在这些允许范围之外的分区,可使用加载作业或查询作业,相关说明请参阅对分区表数据执行附加和覆盖操作

流式插入到分区表

您可以将数据流式插入到按 DATETIMESTAMP 列分区且列值介于过去 5 年到未来 1 年之间的表。不在此范围内的数据会被拒绝。

流式插入数据时,首先将其提取到 UNPARTITIONED 分区。当未分区数据累积到足够多时,系统会将其分区到相应的分区。

使用模板表自动创建表

模板表提供了一种将逻辑表拆分为多个较小表的机制,以创建较小的数据集(例如,按用户 ID)。模板表有许多限制,如下所述。建议使用分区表聚簇表来实现此行为。

如需通过 BigQuery API 使用模板表,请在 insertAll 请求中添加 templateSuffix 参数。对于 bq 命令行工具,请将 template_suffix 标志添加到 insert 命令。如果 BigQuery 检测到 templateSuffix 参数或 template_suffix 标志,它会将目标表视为基本模板,并创建一个架构与目标表相同且名称中包含指定后缀的新表:

<targeted_table_name> + <templateSuffix>

使用模板表,免去了逐一创建每个表以及为每个表指定架构的麻烦。您只需创建单个模板并提供不同后缀,BigQuery 便可为您创建新表。BigQuery 会将这些表置于同一项目和数据集中。

通过模板表创建的表通常只需几秒即可使用。极少数情况下可能需要更长时间。

更改模板表架构

如果更改模板表架构,则后续生成的所有表都将使用更新的架构。除非现有表仍具有流式缓冲区,否则之前生成的表不受影响。

对于仍具有流式缓冲区的现有表,如果以向后兼容的方式修改模板表架构,则以主动流式插入方式生成的表的架构也会获得更新。但是,如果不以向后兼容的方式修改模板表架构,则所有使用旧架构的缓冲数据都将丢失。此外,如果已生成的现有表使用现已不兼容的旧架构,则您也无法将新的数据流式插入到这些表中。

更改模板表架构后,请等待更改传播完成,然后再尝试插入新的数据或查询生成的表。插入新字段的请求几分钟就会成功。而尝试查询新字段最多可能需要等待 90 分钟。

如果要更改已生成表的架构,则必须等到通过模板表的流式插入停止,并且 tables.get() 响应中没有已生成表的流式插入统计信息部分(表示表中没有缓冲的数据),才能更改架构。

分区表聚簇表不会遇到上述限制,建议您采用这种机制。

模板表详情

模板后缀值
templateSuffix(或 --template_suffix)值只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。表名称和表后缀总共不得超过 1024 个字符。
配额
模板表与其他表一样受到类似的流式配额限制。此外,当您将数据流式传输到模板表时,停用尽力去重功能不会提供更高的配额。
生存时间
生成的表会从数据集继承到期时间。与普通的流式插入数据一样,无法立即复制或导出生成的表。
去重功能
去重仅会发生在针对目标表的相同引用之间。 例如,如果同时使用模板表和常规 insertAll 命令将数据流式插入到生成的表,则在通过模板表和常规 insertAll 命令插入的行之间不会发生去重操作。
视图
模板表和生成的表不应为视图。

使用情况示例

大量事件日志

如果您有实时收集大量数据的应用,则流式插入会是一个不错的选择。通常,这类应用具有以下标准:

  • 非事务性。大量、不断附加行。应用可容忍偶尔出现重复项或数据暂时不可用。
  • 汇总分析。执行查询通常是为了进行趋势分析,而不会选择单条或小范围记录。

事件跟踪就属于一种大量事件日志的情况。假设您有一个跟踪事件的移动应用。您的应用或移动服务器可以独立记录用户交互或系统错误,并将其流式插入到 BigQuery 中。您可分析此数据以判断整体趋势(例如高度交互或问题方面),并实时监控错误情况。

手动移除重复项

您可以通过下列手动过程,确保在流式插入完成后没有重复的行。

  1. insertId 作为列添加到表架构中,并在每行的数据中加入 insertId 值。
  2. 在流式插入停止后,执行下列查询以检查重复项:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    如果结果大于 1,则表示存在重复项。
  3. 要移除重复项,请执行以下查询。应指定目标表、允许大型结果并停用结果扁平化功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

有关重复项移除查询的注意事项:

  • 对于重复项移除查询,较为安全的策略是以新表为目标表。或者,也可以使用写入处置 WRITE_TRUNCATE 来以源表为目标表。
  • 重复项移除查询会在表架构的末尾添加一个值为 1row_number 列。该查询使用标准 SQL 中的 SELECT * EXCEPT 语句将 row_number 列从目标表中排除。#standardSQL 前缀用于为此查询启用标准 SQL。或者,您也可按特定列名称进行选择以忽略此列。
  • 要查询移除了重复项的实时数据,您还可以使用重复项移除查询来经由表创建视图。请注意,将根据视图中所选的列计算视图的查询费用,这可能导致字节扫描大小较大。

排查流式插入问题

要了解如何在流式插入期间排查错误,请参阅“排查错误”页面上的排查流式插入问题

返回页首

流式插入示例

C#

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 C# 设置说明进行操作。如需了解详情,请参阅 BigQuery C# API 参考文档


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, "", nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void runTableInsertRows() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 BigQuery Ruby API 参考文档

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

插入行时无需填充 insertId 字段。以下示例展示了如何避免在流式传输时为每行发送 insertId

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void runTableInsertRowsWithoutRowIds() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create rows to insert
    Map<String, Object> rowContent1 = new HashMap<>();
    rowContent1.put("stringField", "Phred Phlyntstone");
    rowContent1.put("numericField", 32);
    Map<String, Object> rowContent2 = new HashMap<>();
    rowContent2.put("stringField", "Wylma Phlyntstone");
    rowContent2.put("numericField", 29);
    List<InsertAllRequest.RowToInsert> rowContent = new ArrayList<>();
    // insertId is null if not specified
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent1));
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent2));
    tableInsertRowsWithoutRowIds(datasetName, tableName, rowContent);
  }

  public static void tableInsertRowsWithoutRowIds(
      String datasetName, String tableName, Iterable<InsertAllRequest.RowToInsert> rows) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(InsertAllRequest.newBuilder(tableId).setRows(rows).build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

在尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

返回页首