Bigtable 変更ストリームを処理する


このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。

音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。

このチュートリアルは、コードの記述と Google Cloud へのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。

目標

このチュートリアルでは、次の方法を説明します。

  • 変更ストリームを有効にして Bigtable テーブルを作成する。
  • 変更ストリームを変換して出力するパイプラインを Dataflow にデプロイする。
  • データ パイプラインの結果を表示する。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API を有効にします。

    gcloud services enable dataflow.googleapis.combigtable.googleapis.combigtableadmin.googleapis.comstorage.googleapis.com
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google Cloud プロジェクトで課金が有効になっていることを確認します

  11. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API を有効にします。

    gcloud services enable dataflow.googleapis.combigtable.googleapis.combigtableadmin.googleapis.comstorage.googleapis.com
  12. cbt CLI を更新してインストールします。
    gcloud components update
    gcloud components install cbt
    

環境を準備する

コードを取得する

サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、pull して最新バージョンにしてください。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

バケットの作成

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Bigtable インスタンスを作成する

    このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。

    テーブルを作成する

    このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。

    テーブルを作成します。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    次のように置き換えます。

    • PROJECT_ID: 使用しているプロジェクトの ID
    • BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID

    パイプラインを開始する

    このパイプラインは、次の手順で変更ストリームを変換します。

    1. 変更ストリームを読み取る
    2. 曲名を取得する
    3. 曲のリッスン イベントを N 秒のウィンドウにグループ化する
    4. トップ 5 の曲をカウントする
    5. 結果を出力する

    パイプラインを実行します。

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(us-east5 など)に置き換えます。

    パイプラインを理解する

    パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。

    変更ストリームの読み取り

    このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    曲名の取得

    曲が再生されると、曲名は列ファミリー cf と列修飾子 song に書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    トップ 5 の曲のカウント

    組み込みの Beam 関数 CountTop.of を使用して、現在のウィンドウで上位 5 つの曲を取得できます。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    結果の出力

    このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    パイプラインを表示する

    1. Google Cloud コンソールの [Dataflow] ページに移動します。

      Dataflow に移動

    2. song-rank で始まる名前のジョブをクリックします。

    3. 画面下部の [表示] をクリックして、ログパネルを開きます。

    4. [ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。

    ストリームの書き込み

    cbt CLI を使用して、さまざまなユーザーの曲の再生回数を song-rank テーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    出力を表示する

    最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。

    gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    出力例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    
  • クリーンアップ

    このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

    プロジェクトの削除

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    リソースを個別に削除する

    1. バケットとファイルを削除します。

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. テーブルで変更ストリームを無効にします。

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. テーブル song-rank を削除します。

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 変更ストリーム パイプラインを停止します。

      1. ジョブを一覧表示し、ジョブ ID を取得します。

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. ジョブをキャンセルします。

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID は、前のコマンドの後に表示されたジョブ ID に置き換えます。

    次のステップ