写入时汇总值

如果要在写入时在 Bigtable 中汇总数据,您可以使用汇总。聚合是 Bigtable 表单元,会在写入数据时对单元格值进行聚合。添加新值时,聚合函数会将该值与单元格中已有的聚合值合并。其他数据库将类似功能称为计数器分布式计数器

Bigtable 提供的聚合类型为 sum。当您向求和汇总单元格添加值时,相应单元格的值会被替换为新添加的值与当前单元格值的总和。

在预览版期间,您可以使用 cbt CLI 和适用于 C++、Go 和 Java 的 Bigtable 客户端库处理聚合。

本文档简要介绍了聚合,展示了如何创建聚合列族,并提供了有关如何向聚合单元格添加值的示例。在阅读本文档之前,您应该先熟悉 Bigtable 概览写入

何时使用汇总

如果您关注的是实体的汇总数据(而不是单个数据点),那么 Bigtable 聚合会非常有用。如果要从 Apache Cassandra 或 Redis 等数据库迁移到 Bigtable,则可以在以前依赖于这些系统中的计数器的位置使用 Bigtable 聚合。

时段

您可以使用时间段获取一段时间(例如一小时、一天或一周)的聚合值。您可以向表中的聚合单元格添加新值,而不是在将数据写入表之前或之后汇总数据。

例如,如果您运营着一项帮助慈善机构筹集的服务,您可能想要知道每个广告系列每天的在线捐款金额,但并不需要知道每笔捐款的确切时间或每小时的金额。在表中,行键表示慈善机构 ID,您创建一个名为 donations 的聚合列族。该行中的列限定符是广告系列 ID。

在收到某个广告系列在某一天收到的每一笔捐款后,这笔捐款会添加到当天列内汇总单元格的总和中。针对单元格的每个添加请求使用的时间戳都截断到当天的开始,因此实际上每个请求都具有相同的时间戳。截断时间戳可确保当天的所有捐赠都会添加到同一单元格中。第二天,您的所有请求都会进入一个新单元格,并使用截断至新日期的时间戳,这种模式会持续下去。

根据您的用例,您可能会选择为新的汇总创建新列。如需详细了解时间段,请参阅时序数据的架构设计

简化工作流程

借助汇总功能,您可以在 Bigtable 表中对数据进行汇总,而无需在将数据写入 Bigtable 之前或之后使用任何 ETL 或流式处理软件对数据进行汇总。例如,如果您的应用之前已将消息发布到 Pub/Sub,然后使用 Dataflow 读取消息并汇总数据,再将其写入 Bigtable,那么您可以改为直接将数据发送到 Bigtable 中的汇总单元格。

汇总列族

如需创建和更新聚合单元格,您的表中必须有一个或多个聚合列族(仅包含聚合单元格的列族)。您可以在创建表时创建列族,也可以将聚合列族添加到已在使用的表。创建列族时,您可以指定聚合类型,例如求和。

您无法将包含非汇总数据的列族转换为聚合列族。聚合列族中的列不能包含非聚合单元格,标准列族不能包含聚合单元格。

如需创建包含聚合列族的新表,请参阅创建表。如需将聚合列族添加到表中,请参阅添加列族

汇总类型

Bigtable 支持聚合类型 sum。求和支持的输入类型为 Int64

时间戳

聚合单元格由行键、列族、列限定符和时间戳定义。您每次向单元格添加数据时,都会使用相同的时间戳。如果您向相同的行键、列族和列限定符发送值但时间戳不同,则该列中会创建新的汇总单元格。

发送到汇总单元格的添加请求必须包含时间戳。

输入类型

添加请求中值的输入类型必须与创建列族时使用的输入类型一致。例如,如果您向为 Int64 配置的列族发送字符串值,请求将被拒绝。

AddToCell

添加请求会在 Bigtable Data API 中发送 AddToCell 变更。相反,非汇总写入请求会发送 SetCell 变更。如需了解详情,请参阅 Data API 参考文档AddToCell 操作受到与其他表更改相同的操作限制约束。

在复制表中,在当前复制延迟内,聚合单元在所有集群中在同一总值上收敛。总值是自上次删除操作或自该单元格创建以来,发送到所有集群中该单元格的所有 AddToCell 变更的汇总。

添加请求示例

以下示例展示了如何向汇总单元格添加值。这些示例对需要输入类型 Int64 的列族中的总和进行了求和。

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

替换以下内容:

  • TABLE_ID:表的永久标识符
  • ROW_KEY:行键
  • FAMILY_NAME:聚合列族的名称
  • COLUMN_QUALIFIER:列的标识符
  • VALUE:要添加到单元格的值
  • TIMESTAMP:以微秒为单位的 Unix 时间戳,例如 1710868850000000

示例:

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

如需了解如何安装和使用 Bigtable 的客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

Java

如需了解如何安装和使用 Bigtable 的客户端库,请参阅 Bigtable 客户端库

如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

后续步骤