写入时的汇总值

如果您想在写入时在 Bigtable 中对数据进行汇总,可以使用汇总。汇总是 Bigtable 表单元格, 并汇总单元格的值。当您添加新值时,汇总函数会将该值与单元格中已有的汇总值合并。其他数据库将类似功能称为计数器分布式计数器

您可以使用 cbt CLI 和 适用于 C++、Go 和 Java 的 Bigtable 客户端库。

本文档简要介绍了汇总,展示了如何创建汇总列族,并提供了示例来展示如何向汇总单元格添加值。在阅读本文档之前,您应先熟悉 Bigtable 概览写入

何时使用汇总

如果您只关心实体的汇总数据,而不是单个数据点,Bigtable 汇总非常有用。如果您 或者从 Apache Cassandra 等数据库迁移到 Bigtable Redis 支持在 Cloud Storage 存储分区中运行 Bigtable 聚合, 依赖于这些系统中的计数器。

时段

您可以使用时间范围来获取某个时间段的汇总值,例如 一个小时、一天或一周您可以将新值添加到表格中的汇总单元格,而不是在数据写入表格之前或之后对数据进行汇总。

例如,如果您提供帮助慈善机构筹款的服务,则可能需要了解每个活动每天的在线捐款金额,但不需要知道每次捐款的确切时间或每小时的金额。在 表格,行键代表慈善机构 ID,然后创建一个汇总列 名为“donations”的家庭群组。该行中的列限定符是广告系列 ID。

广告系列在指定日期收到的每笔捐款金额都收到后, 该数字会添加到该日期对应的列中汇总单元格的总和中。对该单元格发出的每个添加请求都会使用截断到当天开始时间的时间戳,因此实际上每个请求都具有相同的时间戳。截断时间戳 确保将当天的所有捐款都添加到同一个单元格中。通过 那么从第二天开始,您的所有请求都会转移到新单元格中 截断到新日期,这种模式会保持不变。

根据您的用例,您可以选择改为为新的汇总创建新列。根据您计划在存储分区中 累积,您可以考虑不同的行键设计。

如需详细了解时间段,请参阅时序的架构设计 数据

简化工作流程

通过汇总,您可以在 Bigtable 表中聚合数据 而无需使用任何 ETL 或流处理软件来汇总 将数据写入 Bigtable 之前或之后。例如: 如果您的应用之前将消息发布到 Pub/Sub 之前使用 Dataflow 读取消息并汇总数据, 将数据写入 Bigtable,则可以直接将数据 来汇总 Bigtable 中的单元格。

汇总列族

要创建和更新汇总单元格,您必须拥有一个或多个汇总 表格中的列族 – 仅包含汇总表格的列族 单元格。您可以在创建表时创建汇总列族,也可以向正在使用的表添加汇总列族。创建列族时,您需要指定汇总类型,例如求和。

您无法将包含非汇总数据的列族转换为 汇总列族。汇总列族中的列不能包含 非汇总单元格和标准列族不能包含汇总单元格。

要创建包含汇总列族的新表,请参阅创建 表格。添加汇总列 表,请参阅添加列 家庭

汇总类型

Bigtable 支持以下聚合类型:

总和

当您向汇总的汇总单元格 (sum) 中添加值时,单元格的值将为 替换为新添加的值与当前单元格值的总和。对和支持的输入类型为 Int64

下限

当您向最小汇总单元格 (min) 添加值时,单元格的值将为 替换为新添加值与当前值之间的较低值 单元格的值。最小值支持的输入类型为 Int64

最大值

当您向一个最大汇总单元格 (max) 添加值后,该单元格的值将为 替换为新添加值与当前值之间的较高值 单元格的值。支持的“max”输入类型为 Int64

HyperLogLog (HLL)

当您向 HLL 汇总单元格 (inthll) 添加值时,该值会添加到 自上次重置(即 创建单元格或删除单元格数据)。此单元格的值表示 该数据集的状态。有关 HLL 算法的更多常规信息,请参阅 HyperLogLog.

您可以使用 Zetasketch 库读取 HLL 值。如需了解详情,请参阅 Zetasketch GitHub 代码库中。通过 HLL 支持的输入类型为 BYTES

时间戳

聚合单元格由行键、列族、列限定符和 时间戳。每次向单元格添加数据时,您都使用相同的时间戳。如果您 向同一个行键、列族和列限定符发送一个值,但发送一个 不同的时间戳,系统会在列中创建一个新的汇总单元。

发送到汇总单元格的添加请求必须包含时间戳。

输入类型

添加请求中值的输入类型必须与 创建的列族时所使用的名称例如,如果您将字符串值发送到 列族,则请求会被拒绝。Int64

AddToCell

添加请求会在 Bigtable Data API 中发送 AddToCell 更改。相比之下,非汇总写入请求会发送 SetCell 更改。如需了解详情,请参阅 Data API 参考文档AddToCell 操作执行相同的操作 限制 与表的其他更改一样

在复制表中,聚合单元格会收敛于 当前复制延迟内的所有集群。最终的值是 自以下时间以来,发送到该单元的所有 AddToCell 项变更的汇总 上次删除操作或自单元格创建后删除的日期。

垃圾回收

在垃圾回收期间,汇总单元格会被视为任何其他单元格:如果将单元格标记为删除,则删除操作会复制到实例中的所有集群。如需了解详情,请参阅复制和垃圾回收。如果向已被垃圾回收移除的汇总单元格发送添加请求,系统会创建新的汇总单元格。

添加请求示例

以下示例展示了如何向汇总单元格添加值。通过 样本用于对需要输入类型 Int64 的列族中的求和相加。

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

替换以下内容:

  • TABLE_ID:表的永久标识符
  • ROW_KEY:行键
  • FAMILY_NAME:聚合列族的名称
  • COLUMN_QUALIFIER:列的标识符
  • VALUE:要添加到单元格的值
  • TIMESTAMP:以微秒为单位的 Unix 时间戳,例如 1710868850000000

示例:

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

如需了解如何安装和使用 Bigtable 的客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

Java

如需了解如何安装和使用 Bigtable 客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

后续步骤