Dataflow を使用して変更ストリーム接続を構築する

このページでは、変更ストリームを使用して、Spanner の変更データの使用と転送を行う Dataflow パイプラインを作成する方法を説明します。このページのサンプルコードを使用して、カスタム パイプラインを構築できます。

基本コンセプト

変更ストリーム用の Dataflow パイプラインの基本コンセプトは次のとおりです。

Dataflow

Dataflow は、ストリーミングとバッチ処理をサポートする、サーバーレスで高速かつ費用対効果の高いサービスです。オープンソースの Apache Beam ライブラリを使用して作成された処理ジョブでポータビリティを実現し、インフラストラクチャのプロビジョニングとクラスタ管理を自動化します。Dataflow は、変更ストリームからの読み取り時に、準リアルタイムのストリーミングを提供します。

Dataflow では、SpannerIO コネクタを備えた Spanner 変更ストリームを使用できます。このコネクタは、変更ストリームをクエリするための Spanner API の抽象化を提供します。このコネクタを使用すると、Spanner API を直接使用する際に必要な変更ストリーム パーティションのライフサイクルを管理する必要がなくなります。コネクタによってデータ変更レコードのストリームが提供されるため、特定のアプリケーションの詳細や動的変更ストリームのパーティショニングよりも、アプリケーション ロジックに注力できます。変更ストリーム データを読み取る必要があるほとんどの場合で、Spanner API ではなく SpannerIO コネクタを使用することをおすすめします。

Dataflow テンプレートは、一般的なユースケースを実装できる事前構築された Dataflow パイプラインです。概要は、Dataflow テンプレートをご覧ください。

Dataflow パイプライン

Spanner 変更ストリーム Dataflow パイプラインは、次の 4 つの主要部分で構成されています。

  1. 変更ストリームがある Spanner データベース
  2. SpannerIO コネクタ
  3. ユーザー定義の変換とシンク
  4. Apache Beam シンク I/O 書き込み

画像

Spanner 変更ストリーム

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。

Apache Beam SpannerIO コネクタ

これは、前述の Dataflow セクションで説明した SpannerIO コネクタです。また、これはデータ変更レコードの PCollection をパイプラインの後のステージに出力するソース I/O コネクタです。出力された各データ変更レコードのイベント時間は、commit タイムスタンプになります。出力されたレコードは順序付けがなく、SpannerIO コネクタは遅延レコードがないことを保証します。

変更ストリームを使用する場合、Dataflow はチェックポインティングを使用します。そのため、各ワーカーは、変更のバッファリングのために構成されたチェックポイント間隔まで待機してから、その変更を送信して処理を続行する場合があります。

ユーザー定義の変換

ユーザー定義の変換を使用すると、Dataflow パイプライン内で処理データを集計、変換、変更できます。この一般的なユースケースには、個人を特定できる情報の削除、ダウンストリーム データ形式要件への準拠、並べ替えがあります。変換に関するプログラミング ガイドについては、Apache Beam の公式ドキュメントをご覧ください。

Apache Beam シンク I/O 書き込み

Apache Beam には、Dataflow パイプラインから BigQuery などのデータシンクに書き込むための組み込みの I/O 変換が含まれています。多くの一般的なデータシンクがネイティブにサポートされています。

Dataflow テンプレート

Dataflow テンプレートを使用すると、Google Cloud コンソール、Google Cloud CLI、または REST API 呼び出しを使用して、一般的なユースケース用の事前に構築された Docker イメージに基づいて Dataflow ジョブを簡単に作成できます。

Spanner 変更ストリームには、次の 3 つの Dataflow Flex テンプレートがあります。

Dataflow テンプレートの IAM 権限を設定する

3 つの Flex テンプレートを一覧表示する Dataflow ジョブを作成する前に、次のサービス アカウントに必要な IAM 権限があることを確認します。

必要な IAM 権限がない場合は、ユーザー管理のワーカー サービス アカウントを指定して Dataflow ジョブを作成する必要があります。詳細については、Dataflow のセキュリティと権限をご覧ください。

必要な権限をすべて付与せずに Dataflow Flex テンプレートからジョブを実行しようとすると、ジョブが失敗して、結果ファイルの読み取りエラーまたはリソースに対する権限拒否エラーが発生することがあります。詳細については、Flex テンプレートのトラブルシューティングをご覧ください。

Dataflow パイプラインを構築する

このセクションでは、コネクタの初期構成について説明し、Spanner の変更ストリーム機能との一般的なインテグレーションのサンプルを示します。

以下の手順を行うには、Dataflow 用の Java 開発環境が必要です。詳細については、Java を使用して Dataflow パイプラインを作成するをご覧ください。

変更ストリームを作成

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。次のステップに進むには、変更ストリームが構成された Spanner データベースが必要です。

きめ細かなアクセス制御権限を付与する

きめ細かいアクセス制御ユーザーが Dataflow ジョブを実行する場合は、ユーザーに変更ストリームの SELECT 権限と変更ストリームのテーブル値関数に対する EXECUTE 権限を持つデータベース ロールへのアクセス権が付与されていることを確認してください。また、プリンシパルが SpannerIO 構成または Dataflow Flex テンプレートでデータベース ロールを指定していることも確認します。

詳細については、細かいアクセス制御についてをご覧ください。

SpannerIO コネクタを依存関係として追加する

Apache Beam SpannerIO コネクタは、Cloud Spanner API を使用して変更ストリームを直接使用することによる複雑さをカプセル化し、変更ストリーム データレコードの PCollection をパイプラインの後のステージに出力します。

これらのオブジェクトは、ユーザーのデータフロー パイプラインの他のステージで使用できます。変更ストリーム インテグレーションは、SpannerIO コネクタの一部です。SpannerIO コネクタを使用可能にするには、依存関係を pom.xml ファイルに追加する必要があります。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

メタデータ データベースを作成する

コネクタは、Apache Beam パイプラインの実行時に各パーティションを追跡する必要があります。このメタデータは、初期化中にコネクタによって作成された Spanner テーブルに保持されます。コネクタの構成時に、このテーブルを作成するデータベースを指定します。

変更ストリームのベスト プラクティスで説明しているように、コネクタがアプリケーションのデータベースでメタデータ テーブルを格納するのではなく、この目的には新しいデータベースを作成することをおすすめします。

SpannerIO コネクタを使用する Dataflow ジョブのオーナーは、このメタデータ データベースで次の IAM 権限を有する必要があります。

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

コネクタを構成する

Spanner 変更ストリーム コネクタは、次のように構成できます。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

readChangeStream() オプションの説明は次のとおりです。

Spanner 構成ファイル(必須)

変更ストリームが作成されたプロジェクト、インスタンス、データベースの構成に使用され、クエリを行う必要があります。 また、必要に応じて、Dataflow ジョブを実行している IAM プリンシパルが細かいアクセス制御ユーザーである場合に使用するデータベース ロールを指定します。ジョブは、変更ストリームにアクセスするために、このデータベース ロールを前提としています。詳細については、細かいアクセス制御についてをご覧ください。

変更ストリーム名(必須)

この名前は、変更ストリームを一意に識別します。ここでの名前は、作成時に使用された名前と同じでなければなりません。

メタデータ インスタンス ID(オプション)

これは、コネクタが変更ストリーム API データの使用量を制御するために使用するメタデータを保存するインスタンスです。

メタデータ データベース ID(必須)

これは、コネクタが変更ストリーム API データの使用量を制御するために使用するメタデータを保存するデータベースです。

メタデータ テーブル名(オプション)

これは、既存のパイプラインを更新する場合にのみ使用します。

これは、コネクタで使用する既存のメタデータ テーブル名です。コネクタがメタデータを保存し、変更ストリーム API データの使用量を制御するために使用されます。このオプションを省略すると、Spanner はコネクタの初期化時に生成された名前を使用して新しいテーブルを作成します。

RPC 優先度(省略可)

変更ストリーム クエリに使用されるリクエストの優先度。このパラメータを省略すると、high priority が使用されます。

InclusiveStartAt(必須)

指定したタイムスタンプからの変更が呼び出し元に返されます。

InclusiveEndAt(省略可)

指定されたタイムスタンプまでの変更が呼び出し元に返されます。このパラメータを省略すると、変更は無期限に出力されます。

変更データを処理するための変換とシンクを追加する

上記の手順が完了すると、構成済みの SpannerIO コネクタが DataChangeRecord オブジェクトの PCollection を出力できるようになります。このストリーミングされたデータをさまざまな方法で処理するパイプライン構成例のいくつかについては、変換とシンクの例をご覧ください。

SpannerIO コネクタが出力する変更ストリーム レコードは、順序付けされていません。 これは、PCollection が順序を保証しないためです。順序付けられたストリームが必要な場合は、レコードをパイプラインの変換としてグループ化し、並べ替える必要があります。サンプル: キーによる並べ替えをご覧ください。このサンプルを拡張し、レコードの任意のフィールド(トランザクション ID など)に基づいてレコードを並べ替えることができます。

変換とシンクの例

独自の変換を定義し、データを書き込むシンクを指定できます。Apache Beam のドキュメントには、適用可能な多くの変換が用意されているだけでなく、I/O コネクタを使用してデータを外部システムに書き込むこともできます。

サンプル: キーで並べ替え

このコードサンプルは、Dataflow コネクタを使用して、commit コネクタのタイムスタンプ順に並べられ主キー別にグループ化されたデータ変更レコードを出力します。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

このコードサンプルは、状態とタイマーを使用して各キーのレコードをバッファに保存し、タイマーの有効期限を将来のユーザー構成時間 TBufferKeyUntilOutputTimestamp 関数で定義)に設定します。Dataflow ウォーターマークが時間 T を渡すと、このコードは、タイムスタンプが T 未満のバッファ内のすべてのレコードをフラッシュして、commit タイムスタンプでレコードを並べ替え、Key-Value ペアを出力します。ここで、

  • キーは入力キーで、サイズは 1000 のバケット配列にハッシュされた主キーです。
  • その値は、キーに対してバッファに保存された、順序付け済みのデータ変更レコードです。

キーごとに、以下が保証されます。

  • タイマーは、有効期限タイムスタンプの順序で起動します。
  • ダウンストリーム ステージでは、要素が生成された順序で受信します。

たとえば、値が 100 のキーの場合、タイマーはそれぞれ T1T10 で起動し、各タイムスタンプでデータ変更レコードのバンドルが生成されます。T1 で出力されたデータ変更レコードは、T10 で出力されたデータ変更レコードよりも前に生成されているため、T1 で出力されたデータ変更レコードも、T10 で出力されたデータ変更レコードより前に次のステージで受信されることが保証されます。この仕組みにより、ダウンストリーム処理での主キーごとの厳格な commit タイムスタンプの順序を保証できます。

このプロセスは、パイプラインが終了してすべてのデータ変更レコードが処理されるまで繰り返し実行されます(終了時刻が指定されていない場合は、無期限に繰り返されます)。

このコードサンプルでは、ウィンドウではなく状態とタイマーを使用して、キーごとに並べ替えを行っています。理由は、ウィンドウが順序どおりに処理される保証はないことです。つまり、古いウィンドウはより新しいウィンドウより後で処理できるため、順不同の処理になる可能性があります。

BreakRecordByModFn

各データ変更レコードには、いくつかの mod を含めることができます。各 mod は、単一の主キー値に対する挿入、更新、削除を表します。この関数は、各データ変更レコードを、mod ごとに 1 つずつ、別々のデータ変更レコードに分割します。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

この関数は DataChangeRecord を取り、整数値にハッシュされた Spanner 主キーをキーとする DataChangeRecord を出力します。

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

タイマーとバッファはキーごとにあります。この関数は、ウォーターマークがバッファに保存されたデータ変更レコードを出力するタイムスタンプをウォーターマークが通過するまで、各データ変更レコードをバッファに保存します。

このコードはループタイマーを利用して、バッファをフラッシュするタイミングを判断します。

  1. キーのデータ変更レコードを初めて見つけると、データ変更レコードの commit タイムスタンプ + incrementIntervalSeconds(ユーザーが構成可能なオプション)にタイマーが起動するようにタイマーが設定されます。
  2. タイマーが起動すると、タイマーの有効期限よりも短いタイムスタンプを持つバッファ内のすべてのデータ変更レコードが recordsToOutput に追加されます。タイムスタンプがタイマーの有効期限以上のデータ変更レコードがバッファにある場合、そのようなデータ変更レコードは出力されるのではなく、バッファに追加され直します。つづいて、次のタイマーを現在のタイマーの有効期限に incrementIntervalInSeconds を足した値に設定します。
  3. recordsToOutput が空でない場合、この関数は recordsToOutput 内のデータ変更レコードを commit タイムスタンプとトランザクション ID で並べ替えて出力します。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

トランザクションの注文

このパイプラインは、トランザクション ID と commit タイムスタンプの順序で変更できます。これを行うには、Spanner キーごとではなく、トランザクション ID / commit タイムスタンプのペアごとにレコードをバッファ保存します。これには、KeyByIdFn のコードを変更する必要があります。

サンプル: トランザクションを組み立てる

このコードサンプルは、データ変更レコードを読み取り、同じトランザクションに属するすべてのデータ変更レコードを 1 つの要素にまとめ、その要素を出力します。このサンプルコードで出力されるトランザクションは、commit タイムスタンプ順ではありません。

このコードサンプルは、バッファを使用してデータ変更レコードからトランザクションをアセンブルします。トランザクションに属するデータ変更レコードを初めて受信すると、データ変更レコードの numberOfRecordsInTransaction フィールドを読み取ります。これは、そのトランザクションに属するデータ変更レコードの想定数を記述しています。そのトランザクションに属するデータ変更レコードは、バッファ内のレコード数がバンドルされたデータ変更レコードを出力する numberOfRecordsInTransaction と一致するまでバッファ保存されます。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

この関数は DataChangeRecord を取り、トランザクション ID をキーとする DataChangeRecord を出力します。

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn は、KeyByTransactionIdFn から {TransactionId, DataChangeRecord} の Key-Value ペアを受信し、TransactionId に基づいてグループにバッファ保存します。バッファ内のレコード数がトランザクション全体に含まれるレコード数と同じである場合、この関数はグループ内の DataChangeRecord オブジェクトをレコード シーケンスで並べ替え、{CommitTimestamp, TransactionId} の Key-Value ペア(Iterable<DataChangeRecord>)を出力します。

ここで、SortKey は、{CommitTimestamp, TransactionId} ペアを表すユーザー定義クラスであると仮定しています。SortKey の詳細については、サンプル実装をご覧ください。

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

サンプル: トランザクション タグでフィルタする

ユーザーデータを変更するトランザクションがタグ付けされると、対応するタグとその種類は DataChangeRecord の一部として保存されます。次の例では、ユーザー定義のトランザクション タグとシステムタグに基づいて変更ストリーム レコードをフィルタする方法を示します。

my-tx-tag のユーザー定義タグ フィルタリング:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

システムタグのフィルタリング / TTL 監査:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

サンプル: 行全体を取得する

この例では、次の定義を持つ Singer という名前の Spanner テーブルを使用します。

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

変更ストリームのデフォルトの OLD_AND_NEW_VALUES 値キャプチャ モードでは、Spanner 行が更新されると、受信したデータ変更レコードには変更された列のみが含まれます。トラッキング対象であっても変更されていない列は、レコードに含まれません。mod の主キーは、データ変更レコードの commit タイムスタンプで Spanner スナップショットの読み取りに使用して、未変更の列を取得することや、行全体を取得することができます。

スナップショットの読み取りが成功するように、データベースの保持ポリシーを変更ストリームの保持ポリシー以上の値に変更しなければならない場合があります。

また、NEW_ROW 値キャプチャ タイプの使用が推奨され、より効率的な方法です。この方法では、デフォルトで行のトラッキングされたすべての列が返され、Spanner への追加のスナップショットの読み取りが不要であるためです。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

この変換では、受信した各レコードの commit タイムスタンプでステイル読み取りを行い、行全体を JSON にマッピングします。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

このコードは、行全体を取得する Spanner データベース クライアントを作成し、ToFullReowJsonFn の 1 つのインスタンスで順次読み取りを実行するように、セッション プールが少数のセッションしか持たないように構成します。Dataflow を使用すると、この関数の多数のインスタンスが生成され、各インスタンスには独自のクライアント プールが必ず配置されるようになります。

サンプル: Spanner から Pub/Sub へ

このシナリオでは、呼び出し元はグループ化や集計を行わずに、可能な限り速やかに Pub/Sub へレコードをストリーミングします。Spanner テーブルに挿入された新しい行すべてをさらに処理するために Pub/Sub へストリーミングするため、ダウンストリーム処理のトリガーに適しています。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Pub/Sub シンクは、正確に 1 回のセマンティクスを実現するように構成できます。

サンプル: Spanner から Cloud Storage

このシナリオでは、呼び出し元は所定のウィンドウ内のすべてのレコードをグループ化し、グループを別の Cloud Storage ファイルに保存します。Spanner の保持期間から独立した分析やポイントインタイム アーカイブに適しています。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Cloud Storage のシンクは、デフォルトで at-least-once(少なくとも 1 回)のセマンティクスを備えています。追加の処理を含めることで、1 回限りのセマンティクスになるように変更できます。

こうしたユースケース用に Dataflow テンプレートも用意されています。変更ストリームを Cloud Storage に接続するをご覧ください。

サンプル: Spanner から BigQuery(台帳)へ

ここで、呼び出し元は変更レコードを BigQuery にストリーミングします。各データ変更レコードは、BigQuery では 1 行として反映されます。これは分析に適しています。このコードは、前述の行全体を取得するセクションで定義した関数を使用して、レコードの完全な行を取得し、BigQuery に書き込みます。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

BigQuery シンクは、デフォルトで少なくとも 1 回のセマンティクスを備えていることに注意してください。追加の処理を含めることで、1 回限りのセマンティクスになるように変更できます。

こうしたユースケース用に Dataflow テンプレートも用意されています。変更ストリームを BigQuery に接続するをご覧ください。

パイプラインをモニタリングする

変更ストリームの Dataflow パイプラインをモニタリングするために使用できる指標のクラスは 2 つあります。

標準の Dataflow 指標

Dataflow には、データの更新速度、システムラグ、ジョブのスループット、ワーカーの CPU 使用率など、ジョブが正常であることを確認するためのいくつかの指標が用意されています。詳細については、Dataflow パイプラインに Monitoring を使用するをご覧ください。

変更ストリーム パイプラインでは、2 つの主要な指標(システム レイテンシデータの鮮度)を考慮する必要があります。

システム レイテンシは、データ項目が処理されているか、処理を待機している、現時点での最長時間(秒)を示します。

データの更新速度は、現在(リアルタイム)と出力ウォーターマークの間の時間を示します。時間 T の出力ウォーターマークは、(厳密に)T より前のイベント時間を持つすべての要素が計算処理されたことを示します。つまり、データの更新速度は、受信したイベントの処理に関して、パイプラインがどれくらい最新の状態であるかを測定します。

パイプラインのリソースが不足している場合は、この 2 つの指標で効果を確認できます。項目の処理にはさらに時間がかかるため、システムのレイテンシが増加します。パイプラインが受信したデータの量に対応できないため、データの更新速度も向上します。

変更ストリームのカスタム指標

こうした指標は Cloud Monitoring で公開されており、次の指標があります。

  • Spanner で commit されたレコードからコネクタによって PCollection に出力されるまでのバケット化された(ヒストグラム)レイテンシ。この指標を使用して、パイプラインのパフォーマンス(レイテンシ)の問題を確認できます。
  • 読み取られたデータレコードの合計数。これは、コネクタによって出力されたレコードの数の全体的な指標です。この数値は、基盤となる Spanner データベースでの書き込みの傾向を反映して、常に増加するものです。
  • 読み取り中のパーティションの数。常に読み取り中のパーティションが存在する必要があります。この数値がゼロの場合は、パイプラインでエラーが発生したことを示します。
  • コネクタの実行中に発行されたクエリの合計数。これは、パイプラインの実行全体を通じて Spanner インスタンスに対して行われた変更ストリーム クエリ全般を示します。これを使用して、コネクタから Spanner データベースへの負荷を見積もることができます。

既存のパイプラインを更新する

ジョブの互換性チェックに合格すると、SpannerIO コネクタを使用して変更ストリームを処理する実行中のパイプラインを更新できます。それには、更新するときに、新しいジョブのメタデータ テーブル名パラメータを明示的に設定する必要があります。更新するジョブの metadataTable パイプライン オプションの値を使用します。

Google 提供の Dataflow テンプレートを使用している場合は、パラメータ spannerMetadataTableName を使用してテーブル名を設定します。既存のジョブを変更して、コネクタ構成でメソッド withMetadataTable(your-metadata-table-name) を使用してメタデータ テーブルを明示的に使用することもできます。完了したら、Dataflow ドキュメント内の置換ジョブの起動の手順に沿って、実行中のジョブを更新できます。

変更ストリームと Dataflow のベスト プラクティス

Dataflow を使用して変更ストリーム接続を構築するためのベスト プラクティスは次のとおりです。

別のメタデータ データベースを使用する

アプリケーション データベースを使用するように構成するのではなく、メタデータ ストレージに使用する SpannerIO コネクタ用の別のデータベースを作成することをおすすめします。

詳細については、別のメタデータ データベースを検討するをご覧ください。

クラスタのサイズを決める

原則として、Spanner 変更ストリーム ジョブの初期ワーカー数は、1,000 回/秒の書き込みあたり 1 ワーカーです。この見積もりは、各トランザクションのサイズ、単一のトランザクションから生成される変更ストリーム レコードの数、使用されている他の変換、集計、シンクなど、複数の要因によって異なる場合があります。

最初のリソーシング後、パイプラインをモニタリングするで説明されている指標を追跡して、パイプラインが正常であることを確認することが重要です。初期ワーカープールのサイズを試して、パイプラインが負荷をどのように処理するかをモニタリングし、必要に応じてノード数を増やすことをおすすめします。CPU 使用率は、負荷が適切で、より多くのノードが必要かどうかを確認する重要な指標です。

既知の制限事項

Dataflow で Spanner 変更ストリームを使用する場合、いくつかの既知の制限があります。

自動スケーリング

SpannerIO.readChangeStream を含むパイプラインの自動スケーリングをサポートするには、Apache Beam 2.39.0 以降が必要です。

2.39.0 より前のバージョンの Apache Beam を使用する場合、水平自動スケーリングに説明されているとおり、SpannerIO.readChangeStream を含むパイプラインでは、自動スケーリングのアルゴリズムを NONE として明示的に指定する必要があります。

自動スケーリングを使用せずに Dataflow パイプラインを手動でスケールするには、ストリーミング パイプラインを手動でスケールするをご覧ください。

Runner V2

Spanner 変更ストリーム コネクタには、Dataflow Runner V2 が必要です。これは、実行時に手動で指定する必要があります。そうしないと、エラーが発行されます。Runner V2 は、--experiments=use_unified_worker,use_runner_v2 でジョブを構成することで指定できます。

スナップショット

Spanner 変更ストリーム コネクタは、Dataflow スナップショットをサポートしていません。

ドレイン中

Spanner 変更ストリーム コネクタは、ジョブの排出をサポートしていません。既存のジョブをキャンセルすることのみ可能です。

また、停止する必要なしに、既存のパイプラインを更新することもできます。

OpenCensus

OpenCensus を使用してパイプラインをモニタリングするには、バージョン 0.28.3 以降を指定します。

パイプライン開始時の NullPointerException

Apache Beam バージョン 2.38.0バグにより、特定の条件下でパイプラインを開始すると NullPointerException が発生する可能性があります。これにより、ジョブは開始されず、代わりに次のエラー メッセージが表示されます。

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

この問題に対処するには、Apache Beam バージョン 2.39.0 以降を使用するか、beam-sdks-java-core のバージョンを 2.37.0 として手動で指定します。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

詳細