書き込み時に値を集計する

書き込み時に Bigtable でデータを集計する場合は、集計を使用できます。集計は、データが書き込まれる際にセル値を集計する Bigtable テーブルセルです。新しい値を追加すると、集計関数によってその値とセル内の集計値が自動的に結合されます。他のデータベースでは、同様の機能をカウンタまたは分散カウンタと呼びます。

集計を操作するには、cbt CLI と、C++、Go、Java 用の Bigtable クライアント ライブラリを使用します。

このドキュメントでは、集計の概要、集計列ファミリーの作成方法、集計セルに値を追加する方法の例について説明します。このドキュメントを読む前に、Bigtable の概要および書き込みについて理解しておく必要があります。

集計を使用する場合

Bigtable 集計は、個々のデータポイントではなく、エンティティのデータを集計する場合に便利です。Apache Cassandra や Redis などのデータベースから Bigtable に移行する場合、以前はこれらのシステムのカウンターに依存していた場所で Bigtable 集計を使用できます。

時間バケット

時間バケットを使用すると、時間、日、週などの期間の集計値を取得できます。テーブルに書き込まれる前または後にデータを集計するのではなく、テーブル内の集計セルに新しい値を追加します。

たとえば、慈善団体の募金を支援するサービスを運営している場合、キャンペーンごとに 1 日あたりのオンライン寄付の金額を知る必要がありますが、各寄付の正確な時間や 1 時間あたりの金額を知る必要はありません。テーブルでは、行キーがチャリティ ID を表し、donations という集計列ファミリーを作成します。行内の列修飾子はキャンペーン ID です。

キャンペーンで 1 日に受け取った寄付の金額に応じて、その日の列の集計セルの合計に加算されます。セルの各追加リクエストでは、その日の始めに切り捨てられたタイムスタンプを使用するため、実際には各リクエストのタイムスタンプは同じになります。タイムスタンプを切り捨てることで、その日の寄付がすべて同じセルに追加されます。翌日、すべてのリクエストが新しいセルに移動し、新しい日付に切り捨てられたタイムスタンプが使用されます。このパターンは続きます。

ユースケースによっては、代わりに新しい集計用に新しい列を作成することを選択することもできます。使用するバケットの数によっては、行キーの設計を変えることを検討してください

時間バケットの詳細については、時系列のスキーマの設計データをご覧ください。

ワークフローのストリーミング

集計を使用すると、Bigtable にデータを書き込む前または後に ETL またはストリーミング処理ソフトウェアを使用してデータを集計する必要なく、Bigtable テーブルにデータを集計できます。たとえば、アプリケーションが以前に Pub/Sub にメッセージをパブリッシュし、Dataflow を使用してメッセージを読み取り、Bigtable に書き込む前にデータを集計していた場合、そのデータを Bigtable の集計セルに直接送信することもできます。

列ファミリーを集計する

集計セルを作成および更新するには、テーブルに 1 つ以上の集計列ファミリー(集計セルのみを含む列ファミリー)が必要です。テーブルを作成するときに作成することも、すでに使用されているテーブルに集計列ファミリを追加することもできます。列ファミリーを作成する場合は、合計などの集計方法を指定します。

非集計データを含む列ファミリーを集計列ファミリーに変換することはできません。集計列ファミリー内の列には非集計セルを含めることはできず、標準列ファミリーには集計セルを含めることはできません。

集計列ファミリーを含む新しいテーブルを作成するには、テーブルの作成をご覧ください。集計列ファミリーをテーブルに追加するには、列ファミリーの追加をご覧ください。

集計タイプ

Bigtable は、次の集計タイプをサポートしています。

合計

合計の集計セル(sum)に値を追加すると、セルの値は新しく追加された値と現在のセルの値の合計に置き換えられます。合計にサポートされている入力タイプは Int64 です。

最小

最小集計セル(min)に値を追加すると、セルの値は新しく追加された値と現在のセル値の間の低い値に置き換えられます。最小にサポートされている入力タイプは Int64 です。

最大

最大集計セル(max)に値を追加すると、セルの値は新しく追加された値と現在のセル値の間の高い値に置き換えられます。最大にサポートされている入力タイプは Int64 です。

HyperLogLog(HLL)

HLL 集計セル(inthll)に値を追加すると、その値は直近のリセット(セルの作成またはデータの削除)以降に追加されたすべての値の確率セットに追加されます。セルの値は、そのセットの状態を表します。HLL アルゴリズムに関するより一般的な情報については、HyperLogLog をご覧ください。

HLL 値は Zetasketch ライブラリを使用して読み取ることができます。詳細については、Zetasketch GitHub リポジトリをご覧ください。HLL にサポートされている入力タイプは BYTES です。

タイムスタンプ

集計セルは、行キー、列ファミリー、列修飾子、タイムスタンプで定義されます。セルにデータを追加するたびに、同じタイムスタンプを使用します。タイムスタンプが異なる同じ行キー、列ファミリー、列修飾子に値を送信すると、列に新しい集計セルが作成されます。

集計セルに送信される追加リクエストには、タイムスタンプを指定する必要があります。

入力タイプ

追加リクエストの値の入力タイプは、列ファミリの作成に使用された入力タイプと一致する必要があります。たとえば、Int64 用に構成された列ファミリーに文字列値を送信すると、リクエストは拒否されます。

AddToCell

追加リクエストは Bigtable Data API の AddToCell ミューテーションを送信します。一方、非集計書き込みリクエストは SetCell ミューテーションを送信します。詳しくは Data API リファレンスをご覧ください。AddToCell オペレーションには、他のテーブル変更と同様のオペレーションの上限が適用されます。

複製されたテーブルでは、集計セルは現在のレプリケーション遅延内のすべてのクラスタで同じ最終値に収束します。最終値は、最後の削除オペレーション以降またはセルの作成以降にすべてのクラスタでそのセルに送信されたすべての AddToCell ミューテーションの集計です。

ガベージ コレクション

集計セルは、ガベージ コレクション中に他のセルと同様に処理されます。セルが削除対象としてマークされると、削除はインスタンス内のすべてのクラスタに複製されます。詳細については、レプリケーションとガベージ コレクションをご覧ください。ガベージ コレクションによって削除された集計セルに追加リクエストが送信された場合、新しい集計セルが作成されます。

リクエストの例を追加する

次の例は、集計セルに値を追加する方法を示しています。これらの例では、入力タイプ Int64 を想定する列ファミリーの合計に追加します。

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

以下を置き換えます。

  • TABLE_ID: テーブルの永続的な識別子
  • ROW_KEY: 行キー
  • FAMILY_NAME: 集計列ファミリーの名前
  • COLUMN_QUALIFIER: 列の識別子
  • VALUE: セルに追加する値
  • TIMESTAMP: Unix タイムスタンプ(マイクロ秒単位)。例: 1710868850000000

例:

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

Bigtable 用のクライアント ライブラリをインストールして使用する方法については、Bigtable クライアント ライブラリをご覧ください。

Bigtable で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

Java

Bigtable 用のクライアント ライブラリをインストールして使用する方法については、Bigtable クライアント ライブラリをご覧ください。

Bigtable で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

次のステップ