写入时的汇总值

如果要在写入时聚合 Bigtable 中的数据, 可以使用汇总。汇总是 Bigtable 表格单元格 并汇总单元格的值。添加新值后, 聚合函数会将值与已有的聚合值合并 。其他数据库将类似功能称为计数器分布式计数器

在预览版期间,您可以使用 cbt CLI 和 适用于 C++、Go 和 Java 的 Bigtable 客户端库。

本文档简要介绍了汇总,并展示了如何创建 汇总列族,并提供了一些示例来说明如何将值添加到 汇总单元格。在阅读本文档之前,您应该先熟悉 Bigtable 概览写入 .

何时使用汇总

当您关心某些情况时,Bigtable 聚合非常有用 为实体提供汇总数据,而不是作为单独的数据点。如果您 或者从 Apache Cassandra 等数据库迁移到 Bigtable Redis 支持在 Cloud Storage 存储分区中运行 Bigtable 聚合, 依赖于这些系统中的计数器。

时段

您可以使用时间范围来获取某个时间段的汇总值,例如 一个小时、一天或一周而不是在写入之前或之后汇总数据 则可以添加新值来汇总表格中的单元格。

例如,如果您运营着一项帮助慈善机构筹集资金的服务,那么您可能 想知道每个广告系列每天的在线捐款金额, 不需要知道每笔捐款的确切时间或每小时的金额。在 表格,行键代表慈善机构 ID,然后创建一个汇总列 名为“donations”的家庭群组。该行的列限定符是广告系列 ID。

随着广告系列在特定日期收到的每笔捐款金额都收到了, 该数字会添加到该日期对应的列中汇总单元格的总和中。每次添加 针对单元格的请求使用的时间戳截断到一天的开始,因此 实际上每个请求都具有相同的时间戳。截断时间戳 这样可确保将当天的所有捐款都添加到同一个单元格中。通过 那么从第二天开始,您的所有请求都会转移到新单元格中 截断到新日期,这种模式会保持不变。

根据您的使用场景,您可以选择为新数据 聚合。根据您计划在存储分区中 累积,您可以考虑不同的行键设计。

如需详细了解时间段,请参阅时序的架构设计 数据

简化工作流程

通过汇总,您可以在 Bigtable 表中聚合数据 而无需使用任何 ETL 或流处理软件来汇总 将数据写入 Bigtable 之前或之后。例如: 如果您的应用之前将消息发布到 Pub/Sub 之前使用 Dataflow 读取消息并汇总数据, 将数据写入 Bigtable,则可以直接将数据 来汇总 Bigtable 中的单元格。

汇总列族

要创建和更新汇总单元格,您必须拥有一个或多个汇总 表格中的列族 – 仅包含汇总表格的列族 单元格。您可以在创建表格时进行创建,也可以添加汇总报告 列族添加到已在使用的表。创建列时 系列,请指定汇总类型,例如求和。

您无法将包含非汇总数据的列族转换为 汇总列族。汇总列族中的列不能包含 非汇总单元格和标准列族不能包含汇总单元格。

要创建包含汇总列族的新表,请参阅创建 表格。添加汇总列 表,请参阅添加列 家庭

汇总类型

Bigtable 支持以下聚合类型:

  • sum.当您向汇总汇总单元格添加值时,单元格的值将为 替换为新添加的值与当前单元格值的总和。 支持求和的输入类型为 Int64
  • min.当您向最小汇总单元格添加值时,单元格的值将为 替换为新添加值与当前值之间的较低值 单元格的值。最小值支持的输入类型为 Int64
  • max.当您向一个最大汇总单元格中添加值时,单元格的值将为 替换为新添加值与当前值之间的较高值 单元格的值。支持的输入类型为 Int64

时间戳

聚合单元格由行键、列族、列限定符和 时间戳。每次向单元格添加数据时,您都使用相同的时间戳。如果您 向同一个行键、列族和列限定符发送一个值,但发送一个 不同的时间戳,系统会在列中创建一个新的汇总单元。

发送到汇总单元格的添加请求必须包含时间戳。

输入类型

添加请求中值的输入类型必须与 创建的列族时所使用的名称例如,如果您将字符串值发送到 列族,则请求会被拒绝。Int64

AddToCell

添加请求会在 Bigtable Data API 中发送一项 AddToCell 变更。 相比之下,非汇总写入请求会发送 SetCell 变更。有关 请参阅 Data API 参考文档 .AddToCell 操作执行相同的操作 限制 其他表更改操作

在复制表中,聚合单元格会收敛于 当前复制延迟内的所有集群。总价值是 自以下时间以来发送到该单元的所有 AddToCell 项变更的汇总 上次删除操作或自单元格创建后删除的日期。

添加请求示例

以下示例展示了如何向汇总单元格添加值。通过 样本用于对需要输入类型 Int64 的列族中的求和相加。

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

替换以下内容:

  • TABLE_ID:表的永久标识符
  • ROW_KEY:行键
  • FAMILY_NAME:聚合列族的名称
  • COLUMN_QUALIFIER:列的标识符
  • VALUE:要添加到单元格的值
  • TIMESTAMP:以微秒为单位的 Unix 时间戳,例如 1710868850000000

示例:

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

如需了解如何安装和使用 Bigtable 客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

Java

如需了解如何安装和使用 Bigtable 客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

后续步骤