Dataflow를 사용하여 변경 내역 연결 빌드

이 페이지에서는 변경 내역을 사용해 Spanner 변경 데이터를 소비 및 전달하는 Dataflow 파이프라인을 만드는 방법을 보여줍니다. 이 페이지의 예시 코드를 사용하여 커스텀 파이프라인을 빌드할 수 있습니다.

핵심 개념

다음은 변경 내역에 대한 Dataflow 파이프라인의 핵심 개념입니다.

Dataflow

Dataflow는 스트림 및 일괄 처리를 모두 지원하는 서버리스 방식의 빠른 비용 효과적인 서비스입니다. 오픈소스 Apache Beam 라이브러리를 사용하여 기록된 작업을 처리할 때 이동성을 제공하고 인프라 프로비저닝 및 클러스터 관리를 자동화합니다. Dataflow는 변경 내역에서 읽을 때 약 6초의 지연 시간으로 거의 실시간에 가까운 스트리밍을 제공합니다.

Dataflow를 사용하면 변경 내역 쿼리를 위해 Spanner API를 통해 추상화를 제공하는 SpannerIO 커넥터로 Spanner 변경 내역을 소비할 수 있습니다. 이 커넥터를 사용하면 Spanner API를 직접 사용할 때 필요한 변경 내역 파티션 수명 주기를 관리할 필요가 없습니다. 이 커넥터는 데이터 변경 레코드 스트림을 제공하므로 API 세부정보 및 동적 변경 스트림 파티션 나누기 대신 애플리케이션 논리에 더 집중할 수 있게 해줍니다. 변경 내역 데이터를 읽어야 하는 대부분의 환경에서는 Spanner API 대신 SpannerIO 커넥터를 사용하는 것이 좋습니다.

Dataflow 템플릿은 일반적인 사용 사례를 구현하는 사전 빌드된 Dataflow 파이프라인입니다. 개요는 Dataflow 템플릿을 참조하세요.

Dataflow 파이프라인

Spanner 변경 내역 Dataflow 파이프라인은 4개의 기본 부분으로 구성됩니다.

  1. 변경 스트림이 있는 Spanner 데이터베이스
  2. SpannerIO 커넥터
  3. 사용자 정의 변환 및 싱크
  4. 싱크 I/O 작성자

이미지

각 부분에 대해서는 아래에서 자세히 설명합니다.

Spanner 변경 스트림

변경 스트림을 만드는 방법은 변경 스트림 만들기를 참조하세요.

Apache Beam SpannerIO 커넥터

이것이 앞에서 설명한 SpannerIO 커넥터입니다. 데이터 변경 레코드의 PCollection을 파이프라인의 후반 단계로 내보내는 소스 I/O 커넥터입니다. 내보낸 각 데이터 변경 레코드의 이벤트 시간이 커밋 타임스탬프가 됩니다. 내보낸 레코드는 순서가 지정되지 않으며, SpannerIO 커넥터는 늦은 레코드가 없도록 합니다.

Dataflow는 변경 내역을 다룰 때 체크포인트를 사용합니다. 따라서 각 작업자는 추가 처리를 위해 변경사항을 전송하기 전에 변경사항을 버퍼링하는 동안 최대 5초 동안 대기할 수 있습니다. 약 6초의 지연 시간이 예상됩니다.

사용자 정의 변환

사용자 정의 변환은 사용자가 Dataflow 파이프라인 내에서 데이터 처리를 집계, 변환, 수정할 수 있게 해줍니다. 일반적인 사용 사례는 개인 식별 정보를 제거하고, 다운스트림 데이터 형식 요구사항을 충족하고, 정렬을 수행하는 것입니다. 변환에 대한 프로그래밍 가이드는 공식 Apache Beam 문서를 참조하세요.

Apache Beam 싱크 I/O 작성자

Apache Beam에는 Dataflow 파이프라인을 BigQuery와 같은 데이터 싱크에 기록하는 데 사용될 수 있는 기본 제공 I/O 변환이 포함되어 있습니다. 대부분의 일반 데이터 싱크가 기본적으로 지원됩니다.

Dataflow 템플릿

Dataflow 템플릿은 Google Cloud Console, Google Cloud CLI, Rest API 호출을 통해 일반 사용 사례를 위해 사전 빌드된 Docker 이미지를 기반으로 Dataflow 작업을 쉽게 만들 수 있게 해줍니다.

Spanner 변경 내역의 경우 3개의 Dataflow Flex 템플릿이 제공됩니다.

Dataflow 파이프라인 빌드

이 섹션에서는 커넥터의 초기 구성을 설명하고 Spanner 변경 내역 기능을 사용한 일반 통합 샘플을 제공합니다.

이러한 단계를 수행하기 위해서는 Dataflow용 Java 개발 환경이 필요합니다. 자세한 내용은 Java를 사용하여 Dataflow 파이프라인 만들기를 참조하세요.

변경 내역 만들기

변경 내역을 만드는 방법은 변경 내역 만들기를 참조하세요. 다음 단계를 계속하려면 변경 내역이 구성된 Spanner 데이터베이스가 있어야 합니다.

세분화된 액세스 제어 권한 부여

세분화된 액세스 제어 사용자가 Dataflow 작업을 실행할 것으로 예상되는 경우 변경 내역에 대한 SELECT 권한과 변경 내역의 테이블 값 함수에 대한 EXECUTE 권한이 있는 데이터베이스 역할에 대한 액세스 권한이 사용자에게 부여되었는지 확인합니다. 또한 주 구성원이 SpannerIO 구성 또는 Dataflow Flex 템플릿에서 데이터베이스 역할을 지정하는지 확인합니다.

자세한 내용은 세분화된 액세스 제어 정보를 참조하세요.

SpannerIO 커넥터를 종속 항목으로 추가

Apache Beam SpannerIO 커넥터는 Cloud Spanner API를 통해 직접 변경 내역을 소비할 때의 복잡성을 캡슐화하여 변경 내역 데이터 레코드의 PCollection을 파이프라인의 후반 단계로 내보냅니다.

이러한 객체는 사용자의 Dataflow 파이프라인에 있는 다른 단계에 소비될 수 있습니다. 변경 스트림 통합은 SpannerIO 커넥터의 일부입니다. SpannerIO 커넥터를 사용할 수 있으려면 종속 항목을 pom.xml 파일에 추가할 수 있어야 합니다.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

메타데이터 데이터베이스 만들기

커넥터는 Apache Beam 파이프라인을 실행할 때 각 파티션을 추적해야 합니다. 초기화 중 커넥터로 생성된 Spanner 테이블에 이 메타데이터를 유지합니다. 커넥터를 구성할 때 이 테이블이 생성되는 데이터베이스를 지정합니다.

변경 내역 권장사항에 설명된 대로 커넥터가 애플리케이션 데이터베이스를 사용하여 메타데이터 테이블을 저장하도록 허용하는 대신 이 목적을 위해 새 데이터베이스를 만드는 것이 좋습니다.

SpannerIO 커넥터를 사용하는 Dataflow 작업 소유자는 이 메타데이터 데이터베이스가 있는 다음 IAM 권한 집합이 있어야 합니다.

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

커넥터 구성

Spanner 변경 내역 커넥터는 다음과 같이 구성될 수 있습니다.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

다음은 readChangeStream() 옵션에 대한 설명입니다.

Spanner 구성(필수)

변경 스트림을 만들고 쿼리를 수행하는 데 사용되는 프로젝트, 인스턴스, 데이터베이스를 구성하기 위해 사용됩니다. 원하는 경우 Dataflow 작업을 실행하는 IAM 주 구성원이 세분화된 액세스 제어 사용자인 경우 사용할 데이터베이스 역할도 지정합니다. 이 작업으로 이 데이터베이스 역할이 변경 내역에 대한 액세스를 맡습니다. 자세한 내용은 세분화된 액세스 제어 정보를 참조하세요.

변경 스트림 이름(필수)

이 이름은 변경 스트림을 고유하게 식별합니다. 여기에 지정한 이름은 변경 스트림을 만들 때 사용되는 이름과 동일해야 합니다.

메타데이터 인스턴스 ID(선택사항)

변경 스트림 API 데이터 소비를 제어하기 위해 커넥터에서 사용되는 메타데이터를 저장하기 위한 인스턴스입니다.

메타데이터 데이터베이스 ID(필수)

변경 스트림 API 데이터 소비를 제어하기 위해 커넥터에 사용되는 메타데이터를 저장하기 위한 데이터베이스입니다.

메타데이터 테이블 이름(선택사항)

기존 파이프라인을 업데이트할 때만 사용해야 합니다.

커넥터에 사용되는 기존 메타데이터 테이블 이름입니다. 변경 스트림 API 데이터 소비를 제어하기 위해 커넥터가 메타데이터를 저장하는 데 사용됩니다. 이 옵션을 생략하면 Spanner가 커넥터 초기화 시 생성된 이름으로 새 테이블을 만듭니다.

RPC 우선순위(선택사항)

변경 스트림 쿼리에 사용할 요청 우선순위입니다. 이 매개변수를 생략하면 high priority가 사용됩니다.

InclusiveStartAt(필수)

지정된 타임스탬프의 변경사항이 호출자에게 반환됩니다.

InclusiveEndAt(선택사항)

지정된 타임스탬프까지의 변경사항이 호출자에게 반환됩니다. 이 매개변수를 생략하면 변경사항이 무제한 전송됩니다.

변경 데이터 처리를 위한 변환 및 싱크 추가

이전 단계가 완료되면 구성된 SpannerIO 커넥터가 DataChangeRecord 객체의 PCollection을 내보낼 준비가 됩니다. 이러한 스트림 데이터를 여러 방식으로 처리하는 몇 가지 샘플 파이프라인 구성을 보려면 변환 및 싱크 예시를 참조하세요.

SpannerIO 커넥터로 전송되는 변경 스트림 레코드는 순서가 지정되지 않습니다. PCollection에서 순서 지정이 보장되지 않기 때문입니다. 순서가 지정된 스트림이 필요하면 파이프라인에서 레코드를 변환으로 그룹화하고 정렬해야 합니다. 샘플: 키별 순서 지정을 참조하세요. 트랜잭션 ID와 같은 레코드 필드를 기준으로 레코드를 정렬하도록 이 샘플을 확장할 수 있습니다.

변환 및 싱크 예시

자체 변환을 정의하고 데이터를 기록할 싱크를 지정할 수 있습니다. Apache Beam 문서에서는 I/O 커넥터를 사용하여 데이터를 외부 시스템에 기록하는 것 외에도 적용 가능한 무수히 많은 변환을 보여줍니다.

샘플: 키별 순서 지정

이 코드 샘플은 커밋 타임스탬프에 따라 순서가 지정되고 Dataflow 커넥터를 사용하여 기본 키로 그룹화된 데이터 변경 레코드를 내보냅니다.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

이 코드 샘플은 상태 및 타이머를 사용하여 각 키에 대해 레코드를 버퍼링하고, 타이머 만료 시간을 사용자가 구성한 이후 시간 T로 설정합니다(BufferKeyUntilOutputTimestamp 함수에 정의됨). Dataflow 워터마크가 T 시간을 지날 때 이 코드는 버퍼에서 타임스탬프가 T보다 작은 모든 레코드를 플러시하고, 커밋 타임스탬프에 따라 이러한 레코드의 순서를 지정하고, 다음과 같은 키-값 쌍을 출력합니다.

  • 이 키는 입력 키입니다. 즉, 배열 크기가 1,000인 버킷에 해시된 기본 키입니다.
  • 값은 키에 대해 버퍼링된 순서가 지정된 데이터 변경 레코드입니다.

각 키마다 다음이 보장됩니다.

  • 타이머가 만료 타임스탬프 순서대로 작동합니다.
  • 다운스트림 단계에서는 생성된 것과 동일한 순서로 요소를 수신할 수 있습니다.

예를 들어 값이 100인 키가 있다고 가정해보세요. 타이머는 각각 T1T10에서 작동하고 각 타임스탬프에서 데이터 변경 레코드 번들을 생성합니다. T1에 출력된 데이터 변경 레코드가 T10에 출력된 데이터 변경 레코드보다 이전에 생성되었으므로, T1에 출력되는 데이터 변경 레코드도 T10에 출력된 데이터 변경 레코드보다 이전에 있는 다음 단계에서 수신되도록 보장됩니다. 이 메커니즘은 다운스트림 처리를 위해 기본 키별로 엄격한 커밋 타임스탬프 순서 지정을 보장합니다.

이 프로세스는 파이프라인이 종료되고 모든 데이터 변경 레코드가 처리될 때까지 반복됩니다. 또는 종료 시간이 지정되지 않은 경우에는 무한대로 반복됩니다.

이 코드 샘플에는 키별 순서 지정을 수행하기 위해 기간 대신 상태와 타이머가 사용됩니다. 이렇게 하는 이유는 기간의 경우 순서대로 처리가 보장되지 않기 때문입니다. 즉, 이전 기간이 최근 기간보다 나중에 처리될 수 있어서 순서대로 처리가 되지 않을 수 있습니다.

BreakRecordByModFn

각 데이터 변경 레코드에는 여러 모드가 포함될 수 있습니다. 각 모드는 단일 기본 키 값에 대한 삽입, 업데이트, 삭제를 나타냅니다. 이 함수는 모드별로 하나씩 각 데이터 변경 레코드를 개별 데이터 변경 레코드로 구분합니다.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

이 함수는 DataChangeRecord를 가져와서 정수 값으로 해시된 Spanner 기본 키로 입력된 DataChangeRecord를 출력합니다.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

타이머와 버퍼는 키별로 적용됩니다. 이 함수는 워터마크가 버퍼링된 데이터 변경 레코드를 출력하려는 타임스탬프를 지날 때까지 각 데이터 변경 레코드를 버퍼링합니다.

이 코드는 루핑 타이머를 사용하여 버퍼를 플러시할 시간을 결정합니다.

  1. 키에 대한 데이터 변경 레코드가 처음 발견되면 데이터 변경 레코드의 커밋 타임스탬프와 incrementIntervalSeconds(사용자 구성 가능 옵션)를 더한 시간에 작동하도록 타이머를 설정합니다.
  2. 타이머가 작동하면 타임스탬프가 타이머 만료 시간보다 이전인 버퍼의 모든 데이터 변경 레코드를 recordsToOutput에 추가합니다. 타임스탬프가 타이머 만료 시간보다 크거나 같은 데이터 변경 레코드가 버퍼에 포함된 경우 이를 출력하는 대신 이러한 데이터 변경 레코드를 다시 버퍼에 추가합니다. 그런 후 현재 타이머의 만료 시간과 incrementIntervalInSeconds를 더한 시간으로 다음 타이머를 설정합니다.
  3. recordsToOutput이 비어 있지 않으면 함수가 커밋 타임스탬프 및 트랜잭션 ID에 따라 recordsToOutput에서 데이터 변경 레코드 순서를 지정한 후 이를 출력합니다.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

트랜잭션 순서 지정

트랜잭션 ID 및 커밋 타임스탬프에 따라 순서를 지정하도록 이 파이프라인을 변경할 수 있습니다. 이렇게 하려면 각 Spanner 키 대신 각 트랜잭션 ID/커밋 타임스탬프 쌍에 따라 레코드를 버퍼링합니다. 이렇게 하려면 KeyByIdFn에서 코드를 수정해야 합니다.

샘플: 트랜잭션 조합

이 코드 샘플은 데이터 변경 레코드를 읽고, 동일한 트랜잭션에 속하는 모든 데이터 변경 레코드를 단일 요소로 조합하고, 이 요소를 출력합니다. 이 샘플 코드로 출력되는 트랜잭션은 커밋 타임스탬프에 따라 순서가 지정되지 않습니다.

이 코드 샘플은 버퍼를 사용하여 데이터 변경 레코드의 트랜잭션을 조합합니다. 트랜잭션에 속하는 데이터 변경 레코드가 처음 수신되면 해당 트랜잭션에 속하는 예상되는 데이터 변경 레코드 수를 기술하는 데이터 변경 레코드의 numberOfRecordsInTransaction 필드를 읽습니다. 버퍼링된 레코드 수가 번들로 묶인 데이터 변경 레코드를 출력하는 numberOfRecordsInTransaction과 일치할 때까지 해당 트랜잭션에 속하는 데이터 변경 레코드를 버퍼링합니다.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

이 함수는 DataChangeRecord를 가져와서 트랜잭션 ID에 따라 입력된 DataChangeRecord를 출력합니다.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFnKeyByTransactionIdFn에서 {TransactionId, DataChangeRecord}의 수신된 키-값 쌍을 버퍼링하고 TransactionId를 기준으로 이를 그룹으로 버퍼링합니다. 버퍼링된 레코드 수가 전체 트랜잭션에 포함된 레코드 수와 동일하면 이 함수가 레코드 시퀀스에 따라 그룹에서 DataChangeRecord 객체를 정렬하고 {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>의 키-값 쌍을 출력합니다.

여기에서는 SortKey{CommitTimestamp, TransactionId} 쌍을 나타내는 사용자 정의 클래스라고 가정합니다. SortKey샘플 구현을 참조하세요.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

샘플: 트랜잭션 태그로 필터링

사용자 데이터를 수정하는 트랜잭션에 태그를 지정하면 해당 태그와 유형이 DataChangeRecord의 일부로 저장됩니다. 다음 예는 사용자 정의 트랜잭션 태그와 시스템 태그를 기반으로 변경 내역 레코드를 필터링하는 방법을 보여줍니다.

my-tx-tag의 사용자 정의 태그 필터링:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

시스템 태그 필터링/TTL 감사:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

샘플: 전체 행 가져오기

다음 예시에서는 다음과 같이 정의된 Singer라는 Spanner 테이블 작업을 보여줍니다.

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

변경 내역의 기본 OLD_AND_NEW_VALUES 값 캡처 모드에서 Spanner 행이 업데이트되면 수신된 데이터 변경 레코드에 변경된 열만 포함됩니다. 추적되지만 변경되지 않은 열은 레코드에 포함되지 않습니다. 모드의 기본 키를 사용하면 변경되지 않은 데이터를 가져오거나 전체 행을 검색하기 위해 데이터 변경 레코드의 커밋 타임스탬프에서 Spanner 스냅샷 읽기를 수행할 수 있습니다.

스냅샷 읽기가 성공하려면 데이터베이스 보관 정책을 변경 스트림 보관 정책보다 크거나 같은 값으로 변경해야 할 수 있습니다.

또한 NEW_ROW 값 캡처 유형을 사용하는 것이 좋습니다. 이 방법은 기본적으로 행의 모든 추적 열을 반환하며 Spanner의 추가 스냅샷 읽기가 필요하지 않기 때문에 더 효율적입니다.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

이 변환은 수신된 각 레코드의 커밋 타임스탬프에서 비활성 읽기를 수행하고 전체 행을 JSON에 매핑합니다.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

이 코드는 전체 행 가져오기를 수행하기 위해 Spanner 데이터베이스 클라이언트를 만들고 일부 세션만 포함하도록 세션 풀을 구성하여 ToFullRowJsonFn의 한 인스턴스에서 순차적으로 읽기를 수행합니다. Dataflow는 각각 자체 클라이언트 풀이 포함된 상태로 이 함수의 여러 인스턴스가 생성되도록 보장합니다.

샘플: Spanner와 Pub/Sub

이 시나리오에서는 그룹화 또는 집계 없이 호출자가 Pub/Sub로 레코드를 가능한 한 빠르게 스트리밍합니다. 추가 처리를 위해 Spanner 테이블에 삽입되는 모든 신규 행을 Pub/Sub에 스트리밍하므로 다운스트림 처리를 트리거하는 데 적합합니다.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Pub/Sub 싱크는 exactly-once이라는 시맨틱스를 보장하도록 구성될 수 있습니다.

샘플: Spanner와 Cloud Storage

이 시나리오에서는 호출자가 지정된 기간 내에 모든 레코드를 그룹화하고 이 그룹을 별개의 Cloud Storage 파일에 저장합니다. Spanner 보관 기간에 의존하지 않는 분석 및 특정 시점 아카이브에 적합합니다.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Cloud Storage 싱크는 기본적으로 단 한 번이라는 시맨틱스를 제공합니다. 추가 처리를 사용하여 exactly-once 시맨틱스를 갖도록 수정할 수 있습니다.

이 사용 사례를 위한 Dataflow 템플릿도 제공됩니다. Cloud Storage에 변경 내역 연결을 참조하세요.

샘플: Spanner와 BigQuery(원장 테이블)

여기에서는 호출자가 변경 레코드를 BigQuery로 스트리밍합니다. 각 데이터 변경 레코드는 BigQuery에서 하나의 행으로 반영됩니다. 이 방식은 분석에 적합합니다. 이 코드는 앞에서 전체 행 가져오기 섹션에 설명된 함수를 사용하여 레코드의 전체 행을 검색하고 이를 BigQuery에 기록합니다.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

BigQuery 싱크는 기본적으로 at-least-once 시맨틱스를 제공합니다. 추가 처리를 사용하여 exactly-once 시맨틱스를 갖도록 수정할 수 있습니다.

이 사용 사례를 위한 Dataflow 템플릿도 제공됩니다. BigQuery에 변경 내역 연결을 참조하세요.

파이프라인 모니터링

변경 스트림 Dataflow 파이프라인을 모니터링하는 데에는 두 가지 측정항목을 사용할 수 있습니다.

표준 Dataflow 측정항목

Dataflow에서는 데이터 최신 상태, 시스템 지연, 작업 처리량, 작업자 CPU 활용률 등 정상 작업 상태를 보장하기 위한 몇 가지 측정항목이 제공됩니다. 자세한 내용은 Dataflow 파이프라인에 Monitoring 사용을 참조하세요.

변경 내역 파이프라인에서는 시스템 지연 시간데이터 최신 상태라는 두 가지 기본 측정항목을 고려해야 합니다.

시스템 지연 시간은 데이터 항목이 처리되거나 처리를 기다리는 동안의 현재 최대 기간(초)을 알려줍니다.

데이터 최신 상태는 현재(실제 시간)와 출력 워터마크 사이의 기간(시간)을 보여줍니다. T 시간의 출력 워터마크는 이벤트 시간이 (엄밀히) T 이전인 모든 요소가 소비되도록 처리되었음을 나타냅니다. 즉, 데이터 최신 상태는 수신된 이벤트의 처리와 관련하여 파이프라인이 최신 상태인 정도를 측정합니다.

파이프라인에 제공되는 리소스가 부족하면 이러한 두 가지 측정항목으로 해당 효과를 확인할 수 있습니다. 항목이 처리될 때까지 기다리는 시간이 더 필요하면 시스템 지연 시간이 늘어납니다. 파이프라인이 수신되는 데이터 양을 따라 잡을 수 없으면 데이터 최신 상태도 늘어납니다.

커스텀 변경 스트림 측정항목

이러한 측정항목은 Cloud Monitoring에서 제공되며, 다음과 같습니다.

  • 레코드가 Spanner에서 커밋될 때와 커넥터에 의해 PCollection으로 전송될 때 사이의 버케팅된(히스토그램) 지연 시간. 이 측정항목을 사용하면 파이프라인에 있는 모든 성능(지연 시간) 문제를 확인할 수 있습니다.
  • 읽은 총 데이터 레코드 수. 커넥터에서 전송되는 레코드 수를 전반적으로 나타냅니다. 이 숫자는 기본 Spanner 데이터베이스에서의 쓰기 추세에 따라 계속해서 증가합니다.
  • 현재 읽고 있는 파티션 수. 읽기를 수행 중인 파티션은 항상 존재합니다. 이 숫자가 0이면 파이프라인에 오류가 발생했음을 나타냅니다.
  • 커넥터를 실행하는 동안의 총 쿼리 수. 파이프라인 실행 전반에 걸쳐 Spanner 인스턴스에 수행된 전체 변경 스트림 쿼리를 나타냅니다. 커넥터에서 Spanner 데이터베이스로의 로드를 예측하는 데 사용될 수 있습니다.

기존 파이프라인 업데이트

작업 호환성 검사가 통과할 경우 변경 내역을 처리하기 위해 SpannerIO 커넥터를 사용하는 실행 중인 파이프라인을 업데이트할 수 있습니다. 이를 위해서는 업데이트할 때 새 작업의 메타데이터 테이블 이름 매개변수를 명시적으로 설정해야 합니다. 업데이트 중인 작업에서 metadataTable 파이프라인 옵션의 값을 사용합니다.

Google 제공 Dataflow 템플릿을 사용하는 경우 spannerMetadataTableName 매개변수를 사용하여 테이블 이름을 설정합니다. 커넥터 구성에서 withMetadataTable(your-metadata-table-name) 메서드에 메타데이터 테이블을 명시적으로 사용하도록 기존 작업을 수정할 수도 있습니다. 이렇게 한 다음 Dataflow 문서에 있는 교체 작업 실행의 안내에 따라 실행 중인 작업을 업데이트할 수 있습니다.

변경 내역 및 Dataflow 권장사항

다음은 Dataflow를 사용하여 변경 내역 연결을 빌드하기 위한 몇 가지 권장사항입니다.

별도의 메타데이터 데이터베이스 사용

애플리케이션 데이터베이스를 사용하도록 구성하는 대신 SpannerIO 커넥터가 메타데이터 스토리지에 사용할 개별 데이터베이스를 만드는 것이 좋습니다.

자세한 내용은 개별 메타데이터 데이터베이스 고려를 참조하세요.

클러스터 크기 조정

Spanner 변경 내역 작업의 초기 작업자 수에 대한 기본 원칙은 하나의 작업자에 초당 1,000회 쓰기를 지정하는 것입니다. 이러한 예상 값은 각 트랜잭션의 크기, 단일 트랜잭션으로 생성되는 변경 내역 레코드 수, 파이프라인에서 사용되는 기타 변환, 집계, 싱크 등의 여러 요소에 따라 달라질 수 있습니다.

초기 리소스 구성을 마친 후에는 파이프라인이 정상 상태로 작동하도록 파이프라인 모니터링에 설명된 측정항목을 관리하는 것이 중요합니다. 초기 작업자 풀 크기를 여러 가지로 실험해보고 로드에 따른 파이프라인 처리 방식을 모니터링하고, 필요에 따라 노드 수를 늘리는 것이 좋습니다. CPU 활용률은 로드가 적절한지 추가 노드가 필요한지 확인하기 위한 주요 측정항목입니다.

알려진 제한사항

자동 확장

SpannerIO.readChangeStream을 포함하는 모든 파이프라인에 대한 자동 확장 지원에는 Apache Beam 2.39.0 이상이 필요합니다.

2.39.0 이전의 Apache Beam 버전을 사용할 경우 SpannerIO.readChangeStream을 포함하는 파이프라인은 수평 자동 확장에 설명된 대로 NONE과 같은 자동 확장 알고리즘을 명시적으로 지정해야 합니다.

자동 확장을 사용하는 대신 Dataflow 파이프라인을 수동으로 확장하려면 스트리밍 파이프라인 수동 확장을 참조하세요.

Runner V2

Spanner 변경 내역 커넥터에는 Dataflow Runner V2가 필요합니다. 실행 중 이를 수동으로 지정해야 하고, 그렇지 않으면 오류가 발생합니다. --experiments=use_unified_worker,use_runner_v2를 사용해서 작업을 구성하여 Runner V2를 지정할 수 있습니다.

스냅샷

Spanner 변경 내역 커넥터는 Dataflow 스냅샷을 지원하지 않습니다.

드레이닝

Spanner 변경 내역 커넥터는 작업 드레이닝을 지원하지 않습니다. 기존 작업 취소만 가능합니다.

또한 중지할 필요 없이 기존 파이프라인을 업데이트할 수 있습니다.

OpenCensus

OpenCensus를 사용하여 파이프라인을 모니터링하려면 0.28.3 이상 버전을 지정합니다.

파이프라인 시작 시 NullPointerException

Apache Beam 버전 2.38.0버그로 인해 특정 조건에서는 파이프라인을 시작할 때 NullPointerException이 발생할 수 있습니다. 이렇게 하면 작업이 시작되지 않고 대신 다음 오류 메시지가 표시됩니다.

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

이 문제를 해결하려면 Apache Beam 버전 2.39.0 이상을 사용하거나 beam-sdks-java-core 버전을 수동으로 2.37.0으로 지정합니다.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

추가 정보