Java로 변경 내역 읽기

Java용 Cloud Bigtable 클라이언트 라이브러리는 데이터 변경 레코드 처리를 위한 하위 수준 메서드를 제공합니다. 하지만 대부분의 경우, 이 페이지에 설명된 방법을 사용하는 대신 Dataflow를 사용하여 변경사항을 스트리밍하는 것을 추천합니다. Dataflow가 파티션 분할 및 병합을 처리해주기 때문입니다.

시작하기 전에

Java로 변경 내역을 읽기 전에 변경 내역 개요를 숙지하세요. 그런 후, 다음 기본 요건을 완료하세요.

인증 설정

로컬 개발 환경에서 이 페이지의 Java 샘플을 사용하려면 gcloud CLI를 설치 및 초기화한 다음 사용자 인증 정보로 애플리케이션 기본 사용자 인증 정보를 설정하세요.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

자세한 내용은 다음을 참조하세요: Set up authentication for a local development environment.

프로덕션 환경의 인증 설정에 대한 자세한 내용은 Set up Application Default Credentials for code running on Google Cloud를 참조하세요.

변경 내역 사용 설정

먼저 테이블에서 변경 내역을 사용 설정해야 이를 읽을 수 있습니다. 변경 내역이 사용 설정된 새 테이블을 만들 수도 있습니다.

필요한 역할

Bigtable 변경 내역을 읽는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.

  • 변경사항을 스트리밍해올 테이블이 포함된 Bigtable 인스턴스의 Bigtable 관리자(roles/bigtable.admin)

Java 클라이언트 라이브러리를 종속 항목으로 추가

다음과 비슷한 코드를 Maven pom.xml 파일에 추가합니다. VERSION을 사용 중인 클라이언트 라이브러리의 버전으로 바꿉니다. 2.21.0 이상 버전이어야 합니다.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

테이블 파티션 확인

ReadChangeStream 요청을 시작하려면 테이블의 파티션을 알아야 합니다. 테이블 파티션은 GenerateInitialChangeStreamPartitions 메서드를 사용하여 확인할 수 있습니다. 다음 예시에서는 이 메서드를 사용하여 테이블의 각 파티션을 나타내는 ByteStringRanges 스트림을 확인하는 방법을 보여줍니다. 각 ByteStringRange에는 파티션의 시작 키와 종료 키가 포함되어 있습니다.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

각 파티션의 변경사항 처리

그런 후 ReadChangeStream 메서드를 사용하여 각 파티션의 변경사항을 처리할 수 있습니다. 다음은 현재 시간부터 시작해서 파티션의 스트림을 여는 방법의 예시입니다.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery에는 다음 인수가 사용됩니다.

  • 스트림 파티션(필수) - 변경사항을 스트리밍해올 파티션
  • 다음 중 하나:
    • 시작 시간 - 변경사항 처리를 시작할 커밋 타임스탬프
    • 연속 토큰 - 스트리밍을 재개할 위치를 나타내는 토크입니다.
  • 종료 시간(선택사항) - 도달 시 변경사항 처리를 중지할 커밋 타임스탬프. 값을 제공하지 않으면 스트림 읽기가 계속됩니다.
  • 하트비트 기간(선택사항) - 새 변경사항이 없는 경우의 하트비트 메시지 빈도(기본값 5초)

변경 내역 레코드 형식

반환된 변경 내역 레코드는 다음 세 가지 응답 유형 중 하나입니다.

  • ChangeStreamMutation - 데이터 변경 레코드를 나타내는 메시지.

  • CloseStream - 클라이언트가 스트림에서 읽기를 중지해야 함을 나타내는 메시지.

    • 상태 - 스트림을 종료하는 이유를 나타냅니다. 다음 중 하나:
      • OK - 지정된 파티션의 종료 시간에 도달했습니다.
      • OUT_OF_RANGE - 지정된 파티션이 더 이상 존재하지 않습니다. 즉, 이 파티션에서 분할 또는 병합이 수행되었습니다. 각각의 새 파티션에 대해 새 ReadChangeStream 요청을 만들어야 합니다.
    • NewPartitions - OUT_OF_RANGE 응답으로 업데이트된 파티셔닝 정보를 제공합니다.
    • ChangeStreamContinuationTokens - 동일한 위치에서 새 ReadChangeStream 요청을 재개하기 위해 사용되는 토큰 목록입니다. NewPartition당 하나입니다.
  • Heartbeat - 스트림 상태를 체크하는 데 사용할 수 있는 정보가 포함된 주기적 메시지.

    • EstimatedLowWatermark - 지정된 파티션의 낮은 워터마크에 대한 추정치.
    • ContinuationToken - 현재 위치에서 지정된 파티션의 스트리밍을 재개하는 토큰.

데이터 변경 레코드 콘텐츠

변경 내역 레코드에 대한 자세한 내용은 데이터 변경 레코드에 포함된 항목을 참조하세요.

파티션의 변경사항 처리

테이블 파티션이 변경될 때 ReadChangeStream 요청은 새 파티션에서 스트리밍을 재개하는 데 필요한 정보와 함께 CloseStream 메시지를 반환합니다.

분할에는 여러 개의 새 파티션과 각 파티션에 해당하는 ContinuationToken이 포함됩니다. 동일한 위치에서 새 파티션 스트리밍을 재개하려면 해당 토큰과 함께 각각의 새 파티션에 대해 새로운 ReadChangeStream 요청을 수행합니다.

예를 들어 [A,C) 파티션을 스트리밍할 때 2개의 파티션 [A,B)[B,C)로 분할될 경우 다음 순서로 이벤트가 발생할 수 있습니다.

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

같은 위치에서 각 파티션의 스트리밍을 재개하려면 다음과 같은 ReadChangeStreamQuery 요청을 보냅니다.

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

병합의 경우 동일한 파티션에서 재개하려면 병합된 파티션의 각 토큰을 포함하는 새 ReadChangeStream 요청을 보내야 합니다.

예를 들어 2개 파티션 [A,B)[B,C)를 스트리밍할 때 [A,C) 파티션으로 병합되면 다음 순서로 이벤트가 발생할 수 있습니다.

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

같은 위치에서 스트리밍 파티션 [A, C)를 재개하려면 다음과 같이 ReadChangeStreamQuery를 전송합니다.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

다음 단계