Dataflow를 사용하여 변경사항 스트리밍

Bigtable Beam 커넥터를 사용하면 커넥터에서 로직을 자동으로 처리해 주므로 코드에서 파티션 변경사항을 추적하거나 처리할 필요 없이 Dataflow를 사용하여 Bigtable 데이터 변경 레코드를 읽을 수 있습니다.

이 문서에서는 Bigtable Beam 커넥터를 구성하고 사용하여 Dataflow 파이프라인을 통해 변경 내역을 읽는 방법을 설명합니다. 이 문서를 읽기 전에 변경 내역 개요를 읽고 Dataflow에 익숙해져야 합니다.

자체 파이프라인 빌드의 대안

자체 Dataflow 파이프라인을 빌드하지 않으려면 다음 옵션 중 하나를 사용하면 됩니다.

Google에서 제공하는 Dataflow 템플릿을 사용할 수 있습니다.

Bigtable 튜토리얼 또는 빠른 시작의 코드 샘플을 코드의 시작점으로 사용할 수도 있습니다.

생성하는 코드에 google cloud libraries-bom 버전 26.14.0 이상이 사용되어야 합니다.

커넥터 세부정보

BigtableIO.readChangeStream이라는 Bigtable Beam 커넥터 메서드를 사용하면 사용자가 처리할 수 있는 데이터 변경 레코드(ChangeStreamMutation)의 스트림을 읽을 수 있습니다. Bigtable Beam 커넥터는 Apache Beam GitHub 저장소 구성요소입니다. 커넥터 코드에 대한 설명은 BigtableIO.java의 주석을 참조하세요.

Beam 버전 2.48.0 이상에서 커넥터를 사용해야 합니다. Apache Beam 런타임 지원을 검사하여 지원되는 자바 버전을 사용 중인지 확인합니다. 그런 다음 리소스 프로비저닝 및 관리를 처리하고 스트림 데이터 처리의 확장성과 안정성을 지원하는 Dataflow에 커넥터를 사용하는 파이프라인을 배포할 수 있습니다.

Apache Beam 프로그래밍 모델에 대한 자세한 내용은 Beam 문서를 참조하세요.

이벤트 시간 없이 데이터 그룹화

Bigtable Beam 커넥터를 사용하여 스트리밍된 데이터 변경 레코드는 이벤트 시간에 따라 달라지는 Dataflow 함수와 호환되지 않습니다.

복제 및 워터마크에 설명된 대로 파티션의 복제가 인스턴스의 나머지 부분을 따라잡지 못하면 낮은 워터마크가 진행되지 않을 수 있습니다. 낮은 워터마크가 진행을 중지하면 해당 변경 내역이 중단될 수 있습니다.

스트림이 중단되지 않도록 Bigtable Beam 커넥터는 출력 타임스탬프가 0인 모든 데이터를 출력합니다. 영(0) 타임스탬프는 Dataflow가 모든 데이터 변경 레코드를 지연 데이터로 간주하도록 합니다. 따라서 이벤트 시간에 의존하는 Dataflow 기능이 Bigtable 변경 내역과 호환되지 않습니다. 구체적으로는 윈도우 함수, 이벤트 시간 트리거 또는 이벤트- 시간 타이머를 사용할 수 없습니다.

대신 튜토리얼의 예시에 설명된 대로 이벤트 이외의 시간 트리거와 함께 GlobalWindows를 사용하여 이 지연 데이터를 창으로 그룹화할 수 있습니다. 트리거 및 창에 대한 자세한 내용은 Beam 프로그래밍 가이드의 트리거를 참조하세요.

자동 확장

커넥터는 Dataflow 자동 확장을 지원하며, 이는 Runner v2(필수)를 사용할 때 기본적으로 사용 설정됩니다. Dataflow 자동 확장 알고리즘은 Backlog 섹션의 Dataflow 모니터링 페이지에서 모니터링할 수 있는 예상 변경 내역 백로그를 고려합니다. 작업자 수를 제한하는 작업을 배포할 때 --maxNumWorkers 플래그를 사용합니다.

자동 확장을 사용하는 대신 파이프라인을 수동으로 확장하려면 스트리밍 파이프라인 수동 확장을 참조하세요.

제한사항

Dataflow에 Bigtable Beam 커넥터를 사용하기 전에 다음 제한사항을 확인하세요.

Dataflow Runner V2

이 커넥터는 Dataflow Runner v2를 사용해서만 실행할 수 있습니다. 이 기능을 사용 설정하려면 명령줄 인수에 --experiments=use_runner_v2를 지정하세요. Runner v1로 실행하면 파이프라인이 실패하고 다음 예외가 발생합니다.

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

스냅샷

이 커넥터는 Dataflow 스냅샷을 지원하지 않습니다.

중복

Bigtable Beam 커넥터는 커밋 타임스탬프 순서로 각 row key와 각 클러스터의 변경사항을 스트리밍하지만 스트림의 이전 시점에서 다시 시작되는 경우가 있으므로 중복이 생성될 수 있습니다.

시작하기 전에

커넥터를 사용하기 전에 다음 기본 요건을 완료하세요.

인증 설정

이 페이지의 Java 샘플을 로컬 개발 환경에서 사용하려면 gcloud CLI를 설치 및 초기화한 다음 사용자 인증 정보로 애플리케이션 기본 사용자 인증 정보를 설정하세요.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

자세한 내용은 다음을 참조하세요: Set up authentication for a local development environment.

프로덕션 환경의 인증 설정에 대한 자세한 내용은 Set up Application Default Credentials for code running on Google Cloud를 참조하세요.

변경 내역 사용 설정

먼저 테이블에서 변경 내역을 사용 설정해야 이를 읽을 수 있습니다. 변경 내역이 사용 설정된 새 테이블을 만들 수도 있습니다.

변경 내역 메타데이터 테이블

Dataflow를 사용하여 변경사항을 스트리밍하면 Bigtable Beam 커넥터가 기본적으로 __change_stream_md_table이라는 메타데이터 테이블을 만듭니다. 변경 내역 메타데이터 테이블은 커넥터의 작동 상태를 관리하고 데이터 변경 레코드에 대한 메타데이터를 저장합니다.

기본적으로 커넥터는 스트리밍되는 테이블과 동일한 인스턴스에 테이블을 만듭니다. 테이블이 올바르게 작동하는지 확인하려면 메타데이터 테이블의 앱 프로필에서 단일 클러스터 라우팅을 사용하고 단일 행 트랜잭션이 사용 설정되어 있어야 합니다.

Bigtable Beam 커넥터를 사용하여 Bigtable에서 변경사항을 스트리밍하는 방법에 대한 자세한 내용은 BigtableIO 문서를 참조하세요.

필요한 역할

Dataflow를 사용하여 Bigtable 변경 내역을 읽는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.

Bigtable의 변경사항을 읽으려면 다음 역할이 필요합니다.

  • 변경사항을 스트리밍해올 테이블이 포함된 Bigtable 인스턴스의 Bigtable 관리자(roles/bigtable.admin)

Dataflow 작업을 실행하려면 다음 역할이 필요합니다.

역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

Bigtable Beam 커넥터를 종속 항목으로 추가

Maven pom.xml 파일에 다음 종속 항목과 유사한 코드를 추가합니다. 2.48.0 이상 버전이어야 합니다.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

변경 내역 읽기

데이터 변경 레코드를 읽는 Dataflow 파이프라인을 빌드하려면 커넥터를 구성한 후 변환 및 싱크를 추가합니다. 그런 다음 커넥터를 사용하여 Beam 파이프라인에서 ChangeStreamMutation 객체를 읽습니다.

Java로 작성된 이 섹션의 코드 샘플은 파이프라인을 빌드하고 이를 사용하여 키-값 쌍을 문자열로 변환하는 방법을 보여줍니다. 각 쌍은 row key와 ChangeStreamMutation 객체로 구성됩니다. 파이프라인에서 각 객체의 항목을 쉼표로 구분된 문자열로 변환합니다.

파이프라인 빌드

이 Java 코드 샘플은 파이프라인 빌드 방법을 보여줍니다.

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

데이터 변경 레코드 처리

이 샘플은 행의 데이터 변경 레코드에 있는 모든 항목을 반복하고 항목 유형에 따라 문자열 변환 메서드를 호출하는 방법을 보여줍니다.

데이터 변경 레코드에 포함될 수 있는 항목 유형 목록은 데이터 변경 레코드에 포함되는 항목을 참조하세요.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

이 샘플에서는 write 항목이 변환됩니다.

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

이 샘플에서는 셀 삭제 항목이 변환됩니다.

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

이 샘플에서는 column family 삭제 항목이 변환됩니다.


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

모니터링

Google Cloud 콘솔의 다음 리소스를 사용하면 Dataflow 파이프라인을 실행하여 Bigtable 변경 내역을 읽는 동안 Google Cloud 리소스를 모니터링할 수 있습니다.

특히 다음 측정항목을 확인합니다.

  • Bigtable 모니터링 페이지에서 다음 측정항목을 확인합니다.
    • cpu_load_by_app_profile_by_method_by_table 측정항목의 변경 내역별 CPU 사용률 데이터. 변경 내역이 클러스터의 CPU 사용량에 미치는 영향을 보여줍니다.
    • 변경 내역 스토리지 사용률(바이트) (change_stream_log_used_bytes).
  • Dataflow 모니터링 페이지에서 현재 시간과 워터마크의 차이를 보여주는 데이터 최신 상태를 확인합니다. 약 2분 정도 소요되며, 가끔 급증이 발생할 경우 1~2분 정도 더 걸립니다. 데이터 최신 상태 측정항목이 이 기준점보다 지속적으로 높으면 파이프라인에 대한 리소스가 부족할 가능성이 높으므로 Dataflow 작업자를 더 추가해야 합니다. 데이터 최신 상태는 데이터 변경 레코드가 느리게 처리되는지 여부를 나타내지 않습니다.
  • Dataflow processing_delay_from_commit_timestamp_MEAN 측정항목은 작업의 전체 기간 동안 데이터 변경 레코드의 평균 처리 시간을 알려줄 수 있습니다.

Bigtable server/latencies 측정항목은 데이터 변경 레코드 처리 지연 시간이 아닌 스트리밍 요청 기간을 반영하므로 Bigtable 변경 내역을 읽는 Dataflow 파이프라인을 모니터링할 때 유용하지 않습니다. 변경 내역의 지연 시간이 높다고 해서 요청이 느리게 처리된다는 의미는 아닙니다. 연결이 오랫동안 유지되었음을 의미합니다.

다음 단계