Dataflow を使用して変更ストリーム接続を構築する

このページでは、変更ストリームを使用して、Spanner の変更データの使用と転送を行う Dataflow パイプラインを作成する方法を説明します。このページのサンプルコードを使用して、カスタム パイプラインを構築できます。

基本コンセプト

変更ストリーム用の Dataflow パイプラインの基本コンセプトは次のとおりです。

Dataflow

Dataflow は、ストリーミングとバッチ処理をサポートする、サーバーレスで高速かつ費用対効果の高いサービスです。オープンソースの Apache Beam ライブラリを使用して作成された処理ジョブでポータビリティを実現し、インフラストラクチャのプロビジョニングとクラスタ管理を自動化します。 Dataflow は準リアルタイムのストリーミングを行うようになります。変更ストリームから読み取る場合は、約 6 秒のレイテンシが発生します。

Dataflow では、SpannerIO コネクタを備えた Spanner 変更ストリームを使用できます。このコネクタは、変更ストリームをクエリするための Spanner API の抽象化を提供します。このコネクタを使用すると、Spanner API を直接使用する際に必要な変更ストリーム パーティションのライフサイクルを管理する必要がなくなります。コネクタによってデータ変更レコードのストリームが提供されるため、特定のアプリケーションの詳細や動的変更ストリームのパーティショニングよりも、アプリケーション ロジックに注力できます。変更ストリーム データを読み取る必要があるほとんどの場合で、Spanner API ではなく SpannerIO コネクタを使用することをおすすめします。

Dataflow テンプレートは、一般的なユースケースを実装できる事前構築された Dataflow パイプラインです。概要は、Dataflow テンプレートをご覧ください。

Dataflow パイプライン

Spanner 変更ストリーム Dataflow パイプラインは、4 つの主要部分で構成されています。

  1. 変更ストリームがある Spanner データベース
  2. SpannerIO コネクタ
  3. ユーザー定義の変換とシンク
  4. シンク I/O ライター

イメージ

これらのそれぞれを以下で詳しく説明します。

Spanner 変更ストリーム

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。

Apache Beam SpannerIO コネクタ

これは、前述の SpannerIO コネクタです。そして、データ変更レコードの PCollection をパイプラインの後のステージに出力するソース I/O コネクタです。出力された各データ変更レコードのイベント時間は、commit タイムスタンプになります。出力されたレコードは順序付けがなく、SpannerIO コネクタは遅延レコードがないことを保証します。

変更ストリームを使用する場合、Dataflow はチェックポインティングを使用します。そのため、各ワーカーは変更のバッファリング中に最大 5 秒間待機してから、その変更を送信して処理を続行する場合があります。レイテンシは約 6 秒になると予想されます。

ユーザー定義の変換

ユーザー定義の変換を使用すると、Dataflow パイプライン内で処理データを集計、変換、変更できます。この一般的なユースケースには、個人を特定できる情報の削除、ダウンストリーム データ形式要件への準拠、並べ替えがあります。変換に関するプログラミング ガイドについては、Apache Beam の公式ドキュメントをご覧ください。

Apache Beam シンク I/O 書き込み

Apache Beam には、Dataflow パイプラインから BigQuery などのデータシンクに書き込むための組み込みの I/O 変換が含まれています。多くの一般的なデータシンクがネイティブにサポートされています。

Dataflow テンプレート

Dataflow テンプレートを使用すると、Google Cloud コンソール、Google Cloud CLI、または REST API 呼び出しを介して、一般的なユースケース用の事前に構築された Docker イメージに基づいて Dataflow ジョブを簡単に作成できます。

Spanner 変更ストリームについては、3 つの Dataflow Flex テンプレートが用意されています。

Dataflow パイプラインを構築する

このセクションでは、コネクタの初期構成について説明し、Spanner の変更ストリーム機能との一般的なインテグレーションのサンプルを提供します。

以下の手順を行うには、Dataflow 用の Java 開発環境が必要です。詳細については、Java を使用して Dataflow パイプラインを作成するをご覧ください。

変更ストリームを作成

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。次のステップに進むには、変更ストリームが構成された Spanner データベースが必要です。

きめ細かなアクセス制御権限を付与する

きめ細かいアクセス制御ユーザーが Dataflow ジョブを実行する場合は、ユーザーに変更ストリームの SELECT 権限と変更ストリームのテーブル値関数に対する EXECUTE 権限を持つデータベース ロールへのアクセス権が付与されていることを確認してください。また、プリンシパルが SpannerIO 構成または Dataflow Flex テンプレートでデータベース ロールを指定していることを確認します。

詳細については、細かいアクセス制御についてをご覧ください。

SpannerIO コネクタを依存関係として追加する

Apache Beam SpannerIO コネクタは、Cloud Spanner API を介して変更ストリームを直接使用し、変更ストリーム データレコードの PCollection をパイプラインの後半に出力する複雑さをカプセル化します。

これらのオブジェクトは、ユーザーの Dataflow パイプラインの他のステージで使用できます。変更ストリーム インテグレーションは、SpannerIO コネクタの一部です。SpannerIO コネクタを使用するには、pom.xml ファイルに依存関係を追加する必要があります。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

メタデータ データベースの作成

コネクタは、Apache Beam パイプラインの実行時に各パーティションを追跡する必要があります。このメタデータは、初期化中にコネクタによって作成された Spanner テーブルに保持されます。コネクタの構成時に、このテーブルを作成するデータベースを指定します。

変更ストリームのベスト プラクティスで説明しているように、コネクタがアプリケーションのデータベースでメタデータ テーブルを格納するのではなく、この目的には新しいデータベースを作成することをおすすめします。

SpannerIO コネクタを使用する Dataflow ジョブのオーナーは、このメタデータ データベースで次の IAM 権限を設定する必要があります。

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

コネクタを構成する

Spanner 変更ストリーム コネクタは、次のように構成できます。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

readChangeStream() オプションの説明は次のとおりです。

Spanner 構成ファイル(必須)

変更ストリームが作成されたプロジェクト、インスタンス、データベースの構成に使用され、クエリを行う必要があります。 また、必要に応じて、Dataflow ジョブを実行している IAM プリンシパルが細かいアクセス制御ユーザーである場合に使用するデータベース ロールを指定します。ジョブは、変更ストリームにアクセスするために、このデータベース ロールを前提としています。詳細については、細かいアクセス制御についてをご覧ください。

変更ストリーム名(必須)

この名前は、変更ストリームを一意に識別します。ここでの名前は、作成時に使用された名前と同じでなければなりません。

メタデータ インスタンス ID(オプション)

これは、コネクタによって変更ストリーム API データの使用量を制御するために使用されるメタデータを保存するインスタンスです。

メタデータ データベース ID(必須)

これは、変更ストリーム API データの使用量を制御するためにコネクタで使用されるメタデータを格納するデータベースです。

メタデータ テーブル名(オプション)

これは、既存のパイプラインを更新する場合にのみ使用します。

これは、コネクタで使用される既存のメタデータ テーブルの名前です。コネクタがメタデータを保存し、変更ストリーム API データの使用量を制御するために使用されます。このオプションを省略すると、Spanner はコネクタの初期化時に生成された名前で新しいテーブルを作成します。

RPC 優先度(省略可)

変更ストリーム クエリに使用されるリクエスト優先度。このパラメータを省略すると、high priority が使用されます。

InclusiveStartAt(必須)

指定されたタイムスタンプの変更は、呼び出し元に返されます。

InclusiveEndAt(省略可)

指定されたタイムスタンプまでの変更が呼び出し元に返されます。このパラメータを省略すると、変更が無期限に発行されます。

変換データを処理するための変換とシンクを追加する

上記の手順が完了すると、構成済みの SpannerIO コネクタが DataChangeRecord オブジェクトの PCollection を出力できるようになります。このストリーミング データをさまざまな方法で処理するサンプル パイプライン構成については、変換とシンクの例をご覧ください。

SpannerIO コネクタが出力する変更ストリーム レコードは、順序付けされていません。これは、PCollection が順序を保証しないためです。順序付けられたストリームが必要な場合は、レコードをパイプラインの変換としてグループ化し、並べ替える必要があります。サンプル: キーによる並べ替えをご覧ください。このサンプルを拡張し、レコードの任意のフィールド(トランザクション ID など)に基づいてレコードを並べ替えることができます。

変換とシンクの例

独自の変換を定義し、データを書き込むシンクを指定できます。Apache Beam のドキュメントには、さまざまな変換を適用でき、I/O コネクタを使用して外部システムにデータを書き込む準備ができています。

サンプル: キーで並べ替え

このコードサンプルは、Dataflow コネクタを使用して、commit コネクタのタイムスタンプ順に並べられ主キー別にグループ化されたデータ変更レコードを出力します。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

このコードサンプルは、状態とタイマーを使用して各キーのレコードをバッファに保存し、タイマーの有効期限を将来のユーザー構成時間 TBufferKeyUntilOutputTimestamp 関数で定義)に設定します。Dataflow ウォーターマークが時間 T を渡すと、このコードは、タイムスタンプが T 未満のバッファ内のすべてのレコードをフラッシュして、commit タイムスタンプでレコードを並べ替え、Key-Value ペアを出力します。ここで、

  • キーは入力キーで、サイズは 1000 のバケット配列にハッシュされた主キーです。
  • その値は、キーに対してバッファに保存された、順序付け済みのデータ変更レコードです。

キーごとに、以下が保証されます。

  • タイマーは、有効期限タイムスタンプの順序で起動します。
  • ダウンストリーム ステージでは、要素が生成された順序で受信します。

たとえば、値が 100 のキーの場合、タイマーはそれぞれ T1T10 で起動し、各タイムスタンプでデータ変更レコードのバンドルが生成されます。T1 で出力されたデータ変更レコードは、T10 で出力されたデータ変更レコードよりも前に生成されているため、T1 で出力されたデータ変更レコードも、T10 で出力されたデータ変更レコードより前に次のステージで受信されることが保証されます。この仕組みにより、ダウンストリーム処理での主キーごとの厳格な commit タイムスタンプの順序を保証できます。

このプロセスは、パイプラインが終了してすべてのデータ変更レコードが処理されるまで繰り返し実行されます(終了時刻が指定されていない場合は、無期限に繰り返されます)。

このコードサンプルでは、ウィンドウではなく状態とタイマーを使用して、キーごとに順序付けを行います。これは、ウィンドウが順番に処理されることが保証されていないためです。つまり、古いウィンドウはより新しいウィンドウより後で処理できるため、順不同の処理になる可能性があります。

BreakRecordByModFn

各データ変更レコードには複数の mod を含めることができます。各 mod は、1 つの主キー値に対する挿入、更新、削除を表します。この関数は、各データ変更レコードを、mod ごとに 1 つずつ、別々のデータ変更レコードに分割します。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

この関数は DataChangeRecord を取り、整数値にハッシュされた Spanner 主キーをキーとする DataChangeRecord を出力します。

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

タイマーとバッファはキーごとにあります。この関数は、ウォーターマークがバッファに保存されたデータ変更レコードを出力するタイムスタンプをウォーターマークが通過するまで、各データ変更レコードをバッファに保存します。

このコードはループタイマーを利用して、バッファをフラッシュするタイミングを判断します。

  1. キーのデータ変更レコードを初めて見つけると、データ変更レコードの commit タイムスタンプ + incrementIntervalSeconds(ユーザーが構成可能なオプション)にタイマーが起動するようにタイマーが設定されます。
  2. タイマーが起動すると、タイマーの有効期限よりも短いタイムスタンプを持つバッファ内のすべてのデータ変更レコードが recordsToOutput に追加されます。タイムスタンプにタイマーの有効期限以上のデータ変更レコードがある場合、そのデータ変更レコードを出力するのではなく、バッファに追加し直します。つづいて、次のタイマーを現在のタイマーの有効期限に incrementIntervalInSeconds を足した値に設定します。
  3. recordsToOutput が空でない場合、この関数は、recordsToOutput 内のデータ変更レコードを commit タイムスタンプとトランザクション ID で並べ替え、出力します。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

トランザクションの順序指定

このパイプラインは、トランザクション ID と commit タイムスタンプの順序に変更できます。これを行うには、Spanner キーごとではなく、トランザクション ID / commit タイムスタンプのペアごとにレコードをバッファ保存します。これを行うには、KeyByIdFn のコードを変更する必要があります。

サンプル: トランザクションの構築

このコードサンプルは、データ変更レコードを読み取り、同じトランザクションに属するすべてのデータ変更レコードを 1 つの要素にまとめ、その要素を出力します。このサンプルコードで出力されるトランザクションは、commit タイムスタンプ順ではありません。

このコードサンプルは、バッファを使用してデータ変更レコードからトランザクションを作成します。トランザクションに属するデータ変更レコードを初めて受信すると、データ変更レコードの numberOfRecordsInTransaction フィールドを読み取ります。ここには、そのトランザクションに属するデータ変更レコードの想定数が示されます。そのトランザクションに属するデータ変更レコードは、バッファ内のレコード数がバンドルされたデータ変更レコードを出力する numberOfRecordsInTransaction と一致するまでバッファ保存されます。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

この関数は、DataChangeRecord を受け取り、トランザクション ID をキーとする DataChangeRecord を出力します。

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn は、KeyByTransactionIdFn から {TransactionId, DataChangeRecord} の Key-Value ペアを受信し、TransactionId に基づいてグループにバッファ保存します。バッファ内のレコード数がトランザクション全体に含まれるレコード数と同じである場合、この関数はグループ内の DataChangeRecord オブジェクトをレコード シーケンスで並べ替え、{CommitTimestamp, TransactionId} の Key-Value ペア(Iterable<DataChangeRecord>)を出力します。

ここで、SortKey は、{CommitTimestamp, TransactionId} ペアを表すユーザー定義クラスであると仮定しています。SortKey実装例をご覧ください。

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

サンプル: トランザクション タグでフィルタする

ユーザーデータを変更するトランザクションがタグ付けされると、対応するタグとその種類は DataChangeRecord の一部として保存されます。次の例では、ユーザー定義のトランザクション タグとシステムタグに基づいて変更ストリーム レコードをフィルタする方法を示します。

my-tx-tag に対するユーザー定義のタグ フィルタリング:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

システムタグのフィルタリング / TTL 監査:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

サンプル: 行全体を取得する

この例では、次の定義を持つ Singer という名前の Spanner テーブルを使用します。

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

変更ストリームのデフォルトの OLD_AND_NEW_VALUES 値キャプチャ モードで、Spanner の行を更新したとき、受信したデータ変更レコードには変更された列のみが含まれます。トラックされたものの、変更されていない列は、レコードに含まれません。mod の主キーは、データ変更レコードの commit タイムスタンプで Spanner スナップショットの読み取りに使用して、未変更の列を取得することや、行全体を取得することができます。

スナップショットの読み取りを成功させるには、データベース保持ポリシーを、変更ストリーム保持ポリシー以上の値に変更する必要がある場合があります。

また、NEW_ROW 値キャプチャ タイプの使用が推奨され、より効率的な方法です。この方法では、デフォルトで行のトラッキングされたすべての列が返され、Spanner への追加のスナップショットの読み取りが不要であるためです。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

この変換では、受信した各レコードの commit タイムスタンプでステイル読み取りを実行し、行全体を JSON にマッピングします。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

このコードは、行の全取得を実行する Spanner データベース クライアントを作成し、ToFullRowJsonFn の 1 つのインスタンスで順次読み取りを実行するように、セッション プールが少数のセッションしか持たないように構成します。Dataflow では、この関数のインスタンスが複数作成され、それぞれに独自のクライアント プールが作成されます。

サンプル: Spanner から Pub/Sub へ

このシナリオでは、呼び出し元はグループ化や集計を行わずに、可能な限り速やかに Pub/Sub へレコードをストリーミングします。Spanner テーブルに挿入された新しい行すべてをさらに処理するために Pub/Sub へストリーミングするため、ダウンストリーム処理のトリガーに適しています。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Pub/Sub シンクは、正確に 1 回のセマンティクスを実現するように構成できます。

サンプル: Spanner から Cloud Storage

このシナリオでは、呼び出し元が、特定のウィンドウ内のすべてのレコードをグループ化し、そのグループを別々の Cloud Storage ファイルに保存します。Spanner の保持期間とは無関係に、分析とポイントインタイム アーカイブに適しています。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Cloud Storage のシンクは、デフォルトで at-least-once(少なくとも 1 回)のセマンティクスを備えています。追加の処理を含めることで、1 回限りのセマンティクスになるように変更できます。

こうしたユースケース用に Dataflow テンプレートも用意されています。変更ストリームを Cloud Storage に接続するをご覧ください。

サンプル: Spanner から BigQuery(台帳)へ

呼び出し元は、変更レコードを BigQuery にストリーミングします。各データ変更レコードは、BigQuery に 1 行に反映されます。これは分析に適しています。このコードは、前述の行全体を取得するセクションで定義した関数を使用して、レコードの完全な行を取得し、BigQuery に書き込みます。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

BigQuery シンクは、デフォルトで少なくとも 1 回のセマンティクスを備えていることに注意してください。追加の処理を含めることで、1 回限りのセマンティクスになるように変更できます。

こうしたユースケース用に Dataflow テンプレートも用意されています。変更ストリームを BigQuery に接続するをご覧ください。

パイプラインをモニタリングする

変更ストリームの Dataflow パイプラインをモニタリングするために使用できる指標のクラスは 2 つあります。

標準の Dataflow 指標

Dataflow には、データの更新頻度、システムラグ、ジョブのスループット、ワーカーの CPU 使用率など、ジョブを正常に保つための指標がいくつか用意されています。詳細については、Dataflow パイプラインに Monitoring を使用するをご覧ください。

変更ストリーム パイプラインでは、2 つの主要な指標(システム レイテンシデータの鮮度)を考慮する必要があります。

システム レイテンシは、データ項目が処理されているか、処理を待機している、現時点での最長時間(秒)を示します。

データの更新頻度は、現在(リアルタイム)から出力ウォーターマークまでの経過時間を示します。時間 T の出力ウォーターマークは、T より前のイベント時間を持つすべての要素が計算処理されたことを示します。言い換えると、データの鮮度は、受信したイベントの処理に関してパイプラインがどの程度新しいかを測定します。

パイプラインのリソースが不足している場合は、この 2 つの指標で効果を確認できます。項目の処理にはさらに時間がかかるため、システムのレイテンシが増加します。パイプラインは受信したデータの量に追いつけないため、データの鮮度も向上します。

カスタム変更ストリームの指標

こうした指標は Cloud Monitoring で公開されており、次の指標があります。

  • Spanner で commit されているレコードがコネクタから PCollection に出力されるまでのバケット化(ヒストグラム)レイテンシ。この指標を使用すると、パイプラインのパフォーマンス(レイテンシ)の問題を確認できます。
  • 読み取られたデータレコードの合計数。これは、コネクタによって出力されたレコードの数の全体的な指標です。この数は増えており、基盤となる Spanner データベースへの書き込みの傾向を反映しています。
  • 現在読み取られているパーティションの数。常にパーティションが読み取られる必要があります。この数値がゼロの場合、パイプラインでエラーが発生したことを示します。
  • コネクタの実行中に発行されたクエリの合計数。これは、パイプラインの実行全体を通じて Spanner インスタンスに対して行われた変更ストリーム クエリ全般を示します。これを使用して、コネクタから Spanner データベースへの負荷を見積もることができます。

既存のパイプラインを更新する

ジョブの互換性チェックに合格すると、SpannerIO コネクタを使用して変更ストリームを処理する実行中のパイプラインを更新できます。それには、更新するときに、新しいジョブのメタデータ テーブル名パラメータを明示的に設定する必要があります。更新するジョブの metadataTable パイプライン オプションの値を使用します。

Google 提供の Dataflow テンプレートを使用している場合は、パラメータ spannerMetadataTableName を使用してテーブル名を設定します。また、コネクタ構成で withMetadataTable(your-metadata-table-name) メソッドを使用して、メタデータ テーブルを明示的に使用するように既存のジョブを変更することもできます。完了したら、Dataflow ドキュメント内の置換ジョブの起動の手順に沿って、実行中のジョブを更新できます。

変更ストリームと Dataflow のベスト プラクティス

Dataflow を使用して変更ストリーム接続を構築するためのベスト プラクティスを以下に示します。

別のメタデータ データベースを使用する

アプリケーション データベースを使用するように構成するのではなく、メタデータ ストレージに使用する SpannerIO コネクタ用の別のデータベースを作成することをおすすめします。

詳細については、別のメタデータ データベースを検討するをご覧ください。

クラスタのサイズを決める

原則として、Spanner 変更ストリーム ジョブの初期ワーカー数は、1,000 回/秒の書き込みあたり 1 ワーカーです。この見積もりは、各トランザクションのサイズ、単一のトランザクションから生成される変更ストリーム レコードの数、使用されている他の変換、集計、シンクなど、複数の要因によって異なる場合があります。

最初のリソーシング後、パイプラインをモニタリングするで説明されている指標を追跡して、パイプラインが正常であることを確認することが重要です。初期ワーカープールのサイズを試して、パイプラインが負荷をどのように処理するかをモニタリングし、必要に応じてノード数を増やすことをおすすめします。CPU 使用率は、負荷が適切で、より多くのノードが必要かどうかを確認する重要な指標です。

既知の制限事項

自動スケーリング

SpannerIO.readChangeStream を含むパイプラインの自動スケーリングのサポートには、Apache Beam 2.39.0 以降が必要です。

2.39.0 より前のバージョンの Apache Beam を使用する場合、水平自動スケーリングに説明されているとおり、SpannerIO.readChangeStream を含むパイプラインでは、自動スケーリングのアルゴリズムを NONE として明示的に指定する必要があります。

自動スケーリングを使用せずに Dataflow パイプラインを手動でスケールするには、ストリーミング パイプラインを手動でスケールするをご覧ください。

Runner V2

Spanner 変更ストリーム コネクタには Dataflow Runner V2 が必要です。 実行中にこれを手動で指定する必要があります。指定しない場合、エラーがスローされます。Runner V2 は、--experiments=use_unified_worker,use_runner_v2 でジョブを構成することで指定できます。

スナップショット

Spanner 変更ストリーム コネクタは、Dataflow スナップショットをサポートしていません。

ドレイン中

Spanner 変更ストリーム コネクタは、ジョブの排出をサポートしていません。既存のジョブをキャンセルすることのみ可能です。

また、停止せずに既存のパイプラインを更新することもできます。

OpenCensus

OpenCensus を使用してパイプラインをモニタリングするには、バージョン 0.28.3 以降を指定します。

パイプライン開始時の NullPointerException

Apache Beam バージョン 2.38.0バグにより、特定の条件下でパイプラインを開始すると、NullPointerException が発生することがあります。これにより、ジョブが開始せず、代わりに次のエラー メッセージが表示されます。

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

この問題に対処するには、Apache Beam バージョン 2.39.0 以降を使用するか、beam-sdks-java-core のバージョンを 2.37.0 として手動で指定します。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

詳細情報