書き込み時に値を集計する

書き込み時に Bigtable でデータを集計する場合は、集計を使用できます。集計は、データが書き込まれる際にセル値を集計する Bigtable テーブルセルです。新しい値を追加すると、集計関数によってその値とセル内の集計値が自動的に結合されます。他のデータベースでは、同様の機能をカウンタまたは分散カウンタと呼びます。

Bigtable で提供される集計の種類は「合計」です。 合計集計セルに値を追加すると、そのセルの値は、新しく追加された値と現在のセル値の合計に置き換えられます。

プレビュー期間中は、cbt CLI と、C++、Go、Java 用の Bigtable クライアント ライブラリを使用して集計を操作できます。

このドキュメントでは、集計の概要、集計列ファミリーの作成方法、集計セルに値を追加する方法の例を示します。このドキュメントを読む前に、Bigtable の概要書き込みについて理解しておく必要があります。

集計を使用する場合

Bigtable の集計は、個々のデータポイントではなく、エンティティの集計データを扱う場合に役立ちます。Apache Cassandra や Redis などのデータベースから Bigtable に移行する場合、これまでこれらのシステムのカウンタを使用していた場所で Bigtable の集計を使用できます。

時間バケット

時間バケットを使用して、1 時間、1 日、1 週間などの期間の集計値を取得できます。テーブルに書き込まれる前または後にデータを集計するのではなく、テーブル内の集計セルに新しい値を追加します。

たとえば、慈善団体の募金を支援するサービスを運営している場合、キャンペーンごとに 1 日あたりのオンライン寄付の金額を知る必要がありますが、各寄付の正確な時間や 1 時間あたりの金額を知る必要はありません。テーブルでは、行キーがチャリティ ID を表し、donations という集計列ファミリーを作成します。行の列修飾子がキャンペーン ID です。

キャンペーンの特定の日に受け取った寄付金額ごとに、その日の列の集計セルの合計に加算されます。セルの各追加リクエストでは、その日の始めに切り捨てられたタイムスタンプを使用するため、実際には各リクエストのタイムスタンプは同じになります。タイムスタンプを切り捨てると、その日からの寄付はすべて同じセルに追加されます。翌日、新しい日付に切り捨てられたタイムスタンプを使用してすべてのリクエストが新しいセルに入り、そのパターンが続きます。

ユースケースによっては、新しい集計用に新しい列を作成することもできます。時間バケットの詳細については、時系列データ用のスキーマ設計をご覧ください。

ワークフローのストリーミング

集計を使用すると、ETL やストリーミング処理ソフトウェアを使用して、Bigtable に書き込む前または後にデータを集計することなく、Bigtable テーブルのデータを集計できます。たとえば、アプリケーションが以前に Pub / Sub にメッセージをパブリッシュし、Dataflow を使用してメッセージを読み取り、Bigtable に書き込む前にデータを集計していた場合、そのデータを Bigtable の集計セルに直接送信することもできます。

列ファミリーの集計

集計セルを作成および更新するには、テーブルに 1 つ以上の集計列ファミリー(集計セルのみを含む列ファミリー)が必要です。集計列ファミリーは、テーブルの作成時に作成するか、すでに使用されているテーブルに集計列ファミリーを追加できます。列ファミリーを作成するときに、合計などの集計タイプを指定します。

非集計データを含む列ファミリーを集計列ファミリーに変換することはできません。集計列ファミリーの列には非集計セルを含めることはできません。また、標準列ファミリーに集計セルを含めることもできません。

集計列ファミリーを持つ新しいテーブルを作成するには、テーブルの作成をご覧ください。集計列ファミリーをテーブルに追加する方法については、列ファミリーの追加をご覧ください。

集計タイプ

Bigtable では、集計型 sum がサポートされています。合計でサポートされている入力型は Int64 です。

タイムスタンプ

集計セルは行キー、列ファミリー、列修飾子、タイムスタンプによって定義されます。セルにデータを追加するたびに同じタイムスタンプを使用します。タイムスタンプが異なる同じ行キー、列ファミリー、列修飾子に値を送信すると、列に新しい集計セルが作成されます。

集計セルに送信される追加リクエストには、タイムスタンプを含める必要があります。

入力タイプ

追加リクエスト内の値の入力タイプは、列ファミリーの作成に使用される入力タイプと一致する必要があります。たとえば、Int64 用に構成された列ファミリーに文字列値を送信すると、リクエストは拒否されます。

AddToCell

追加リクエストは Bigtable Data API の AddToCell ミューテーションを送信します。対照的に、非集計書き込みリクエストは SetCell ミューテーションを送信します。詳しくは、Data API リファレンスをご覧ください。AddToCell オペレーションには、他のテーブル ミューテーションと同じオペレーション上限が適用されます。

レプリケートされたテーブルでは、集計セルは、現在のレプリケーション遅延内に存在するすべてのクラスタで同じ合計値に収束します。合計値は、最後の削除オペレーション以降、またはセルの作成以降にすべてのクラスタ内のセルに送信されたすべての AddToCell ミューテーションの合計です。

リクエストの例を追加する

次の例は、集計セルに値を追加する方法を示しています。これらの例は、入力型 Int64 を想定する列ファミリーの合計に加算されます。

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

次のように置き換えます。

  • TABLE_ID: テーブルの永続的な識別子
  • ROW_KEY: 行キー
  • FAMILY_NAME: 集計列ファミリーの名前
  • COLUMN_QUALIFIER: 列の識別子
  • VALUE: セルに追加する値
  • TIMESTAMP: マイクロ秒単位の UNIX タイムスタンプ(例: 1710868850000000

例:

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

Bigtable 用のクライアント ライブラリをインストールして使用する方法については、Bigtable クライアント ライブラリをご覧ください。

Bigtable で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

Java

Bigtable 用のクライアント ライブラリをインストールして使用する方法については、Bigtable クライアント ライブラリをご覧ください。

Bigtable で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

次のステップ