쓰기 시간에 값 집계

쓰기 시점에 Bigtable에서 데이터를 집계하려면 집계를 사용하면 됩니다. 집계는 데이터가 작성될 때 셀 값을 집계하는 Bigtable 테이블 셀입니다. 새 값을 추가하면 집계 함수가 해당 값을 이미 셀에 있는 집계된 값과 병합합니다. 다른 데이터베이스는 이와 비슷한 기능을 카운터 또는 분산 카운터라고 지칭합니다.

cbt CLI와 C++, Go, 자바용 Bigtable 클라이언트 라이브러리를 사용하여 집계 작업을 수행할 수 있습니다.

이 문서에서는 집계에 대한 개요를 제공하고 집계 column family를 만드는 방법을 설명하며 집계 셀에 값을 추가하는 방법을 보여주는 예시를 제공합니다. 이 문서를 읽기 전에 Bigtable 개요쓰기를 숙지해야 합니다.

집계를 사용해야 하는 경우

Bigtable 집계는 개별 데이터 포인트가 아닌 집계된 항목의 데이터에 관심이 있는 경우에 유용합니다. Apache Cassandra 또는 Redis와 같은 데이터베이스에서 Bigtable로 마이그레이션하는 경우 이전에 이러한 시스템에서 카운터를 사용했던 위치에 Bigtable 집계를 사용할 수 있습니다.

시간 버킷

시간 버킷을 사용하여 시간, 일, 주와 같은 기간에 대한 집계 값을 가져올 수 있습니다. 데이터를 테이블에 쓰기 전후에 집계하는 대신 테이블의 집계 셀에 새 값을 추가합니다.

예를 들어 자선단체에 자금을 조달하는 데 도움을 주는 서비스를 운영하는 경우 각 캠페인의 일일 온라인 기부 금액을 알고 싶지만 각 기부의 정확한 시간이나 시간당 금액을 알 필요가 없습니다. 테이블에서 행 키는 자선단체 ID를 나타내고 donations라는 집계 column family를 만듭니다. 행의 열 한정자는 캠페인 ID입니다.

캠페인에 대해 지정된 날에 받은 각 기부 금액은 해당 날짜의 열에 있는 집계 셀 합계에 추가됩니다. 셀에 대한 각 추가 요청은 하루의 시작 부분으로 잘린 타임스탬프를 사용하므로 결과적으로 각 요청에는 동일한 타임스탬프가 있습니다. 타임스탬프를 자르면 해당 일의 모든 기부가 동일한 셀에 추가됩니다. 다음 날 모든 요청이 새 날짜로 잘린 타임스탬프를 사용하여 새 셀로 이동하며 이 패턴은 계속됩니다.

사용 사례에 따라 새 집계를 위한 새 열을 대신 생성할 수도 있습니다. 누적할 버킷 수에 따라 다른 row key 설계를 고려할 수 있습니다.

시간 버킷에 관한 자세한 내용은 시계열 데이터에 대한 스키마 설계를 참고하세요.

워크플로 간소화

집계를 사용하면 Bigtable에 데이터를 쓰기 전후에 ETL 또는 스트리밍 처리 소프트웨어를 사용하여 데이터를 집계할 필요 없이 Bigtable 테이블에서 데이터를 집계할 수 있습니다. 예를 들어 애플리케이션이 이전에는 Pub/Sub에 메시지를 게시한 후 데이터를 Bigtable에 기록하기 전에 Dataflow를 사용하여 메시지를 읽고 데이터를 집계했다면 이제 데이터를 Bigtable의 집계 셀에 직접 전송할 수 있습니다.

Column family 집계

집계 셀을 만들고 업데이트하려면 테이블에 집계 셀만 포함된 집계 column family가 하나 이상 있어야 합니다. 테이블을 만들 때 만들거나 이미 사용 중인 테이블에 집계 column family를 추가할 수 있습니다. column family를 만들 때는 sum과 같은 집계 유형을 지정합니다.

집계되지 않은 데이터가 포함된 column family를 집계 column family로 변환할 수는 없습니다. 집계 column family 열에는 비집계 셀을 포함할 수 없으며 표준 column family에는 집계 셀을 포함할 수 없습니다.

집계 column family가 있는 새 테이블을 만들려면 테이블 만들기를 참고하세요. 테이블에 집계 column family를 추가하려면 column family 추가를 참고하세요.

집계 유형

Bigtable은 다음 집계 유형을 지원합니다.

합계

합계 집계 셀(sum)에 값을 추가하면 셀 값은 새로 추가된 값과 현재 셀 값의 합계로 대체됩니다. 합계에 지원되는 입력 유형은 Int64입니다.

최소

최솟값 집계 셀(min)에 값을 추가하면 셀 값은 새로 추가된 값과 현재 셀 값 중 더 낮은 값으로 대체됩니다. min에 지원되는 입력 유형은 Int64입니다.

최대

최대 집계 셀(max)에 값을 추가하면 셀 값은 새로 추가된 값과 현재 셀 값 중 더 큰 값으로 대체됩니다. max에 지원되는 입력 유형은 Int64입니다.

HyperLogLog(HLL)

HLL 집계 셀(inthll)에 값을 추가하면 가장 최근 재설정(셀 생성 또는 데이터 삭제) 이후 추가된 모든 값의 확률적 집합에 값이 추가됩니다. 셀 값은 해당 집합의 상태를 나타냅니다. HLL 알고리즘에 관한 일반적인 내용은 HyperLogLog를 참고하세요.

Zetasketch 라이브러리를 사용하여 HLL 값을 읽을 수 있습니다. 자세한 내용은 Zetasketch GitHub 저장소를 참고하세요. HLL에 지원되는 입력 유형은 BYTES입니다.

타임스탬프

집계 셀은 row key, column family, column qualifier, 타임스탬프로 정의됩니다. 셀에 데이터를 추가할 때마다 동일한 타임스탬프를 사용합니다. 동일한 row key, column family, column qualifier 및 다른 타임스탬프로 값을 전송하면 열에 새로운 집계 셀이 생성됩니다.

집계 셀로 전송되는 추가 요청에는 타임스탬프가 포함되어야 합니다.

입력 유형

추가 요청의 값 입력 유형은 column family를 만들 때 사용하는 입력 유형과 일치해야 합니다. 예를 들어 Int64로 구성된 column family에 문자열 값을 전송하면 요청이 거부됩니다.

AddToCell

추가 요청은 Bigtable Data API에서 AddToCell 변형을 전송합니다. 반면 비집계 쓰기 요청은 SetCell 변형을 전송합니다. 자세한 내용은 Data API 참조를 참고하세요. AddToCell 작업에는 다른 테이블 변이와 동일한 작업 제한이 적용됩니다.

복제된 테이블에서 집계 셀은 현재 복제 지연 내 모든 클러스터에서 동일한 최종 값으로 수렴합니다. 최종 값은 마지막 삭제 작업 이후 또는 셀이 생성된 이후 모든 클러스터에서 해당 셀에 전송된 모든 AddToCell 변형의 집계입니다.

가비지 컬렉션

집계 셀은 가비지 컬렉션 중에 다른 셀과 마찬가지로 처리됩니다. 셀이 삭제 대상으로 표시되면 삭제가 인스턴스의 모든 클러스터에 복제됩니다. 자세한 내용은 복제 및 가비지 컬렉션을 참고하세요. 가비지 컬렉션에 의해 삭제된 집계 셀에 추가 요청이 전송되면 새 집계 셀이 생성됩니다.

추가 요청 예시

다음 예는 집계 셀에 값을 추가하는 방법을 보여줍니다. 이 예시는 입력 유형 Int64를 예상하는 column family의 합계에 추가됩니다.

cbt

cbt addtocell TABLE_ID ROW_KEY FAMILY_NAME:COLUMN_QUALIFER=VALUE@TIMESTAMP

다음을 바꿉니다.

  • TABLE_ID: 테이블의 영구 식별자
  • ROW_KEY: row key
  • FAMILY_NAME: 집계 column family의 이름
  • COLUMN_QUALIFIER: 열의 식별자
  • VALUE: 셀에 추가할 값
  • TIMESTAMP: 마이크로초 단위의 Unix 타임스탬프(예: 1710868850000000)

예를 들면 다음과 같습니다.

cbt addtocell mobile-data device-1 updates:week12=100@1710868850000000

Go

Bigtable용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Bigtable 클라이언트 라이브러리를 참조하세요.

Bigtable에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/bigtable"
)

func writeAggregate(w io.Writer, projectID, instanceID string, tableName string) error {
	// projectID := "my-project-id"
	// instanceID := "my-instance-id"
	// tableName := "mobile-time-series"

	ctx := context.Background()
	client, err := bigtable.NewClient(ctx, projectID, instanceID)
	if err != nil {
		return fmt.Errorf("bigtable.NewClient: %w", err)
	}
	defer client.Close()
	tbl := client.Open(tableName)
	columnFamilyName := "view_count"
	viewTimestamp, err := time.Parse(time.RFC3339, "2024-03-13T12:41:34Z")
	if err != nil {
		return err
	}
	hourlyBucket := viewTimestamp.Truncate(time.Hour)

	mut := bigtable.NewMutation()
	mut.AddIntToCell(columnFamilyName, "views", bigtable.Time(hourlyBucket), 1)

	rowKey := "page#index.html"
	if err := tbl.Apply(ctx, rowKey, mut); err != nil {
		return fmt.Errorf("Apply: %w", err)
	}

	fmt.Fprintf(w, "Successfully wrote row: %s\n", rowKey)
	return nil
}

자바

Bigtable용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Bigtable 클라이언트 라이브러리를 참조하세요.

Bigtable에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 클라이언트 라이브러리의 인증 설정을 참조하세요.


import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class WriteAggregate {
  private static final String COUNT_COLUMN_FAMILY_NAME = "view_count";
  private static final long MICROS_PER_MILLI = 1000;

  public static void writeAggregate(String projectId, String instanceId, String tableId) {
    // String projectId = "my-project-id";
    // String instanceId = "my-instance-id";
    // String tableId = "page-view-counter";

    try (BigtableDataClient dataClient = BigtableDataClient.create(projectId, instanceId)) {

      String rowKey = "page#index.html";
      Instant viewTimestamp = Instant.parse("2024-03-13T12:41:34.123Z");

      // Bucket the views for an hour into a single count, giving us an hourly view count for a
      // given page.
      Instant hourlyBucket = viewTimestamp.truncatedTo(ChronoUnit.HOURS);
      long hourlyBucketMicros = hourlyBucket.toEpochMilli() * MICROS_PER_MILLI;

      RowMutation rowMutation =
          RowMutation.create(tableId, rowKey)
              .addToCell(COUNT_COLUMN_FAMILY_NAME, "views", hourlyBucketMicros, 1);

      dataClient.mutateRow(rowMutation);
      System.out.printf("Successfully wrote row %s", rowKey);

    } catch (Exception e) {
      System.out.println("Error during WriteAggregate: \n" + e.toString());
    }
  }
}

다음 단계