このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。
音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。
このチュートリアルは、コードの記述と Google Cloud へのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。
目標
このチュートリアルでは、次の方法を説明します。
- 変更ストリームを有効にして Bigtable テーブルを作成する。
- 変更ストリームを変換して出力するパイプラインを Dataflow にデプロイする。
- データ パイプラインの結果を表示する。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
-
Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API を有効にします。
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
-
Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API を有効にします。
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com cbt
CLI を更新してインストールします。gcloud components update gcloud components install cbt
環境を準備する
コードを取得する
サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、pull して最新バージョンにしてください。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
バケットの作成
gcloud storage buckets create gs://BUCKET_NAMEReplace
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Bigtable インスタンスを作成する
このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。
テーブルを作成する
このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。
テーブルを作成します。
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
次のように置き換えます。
- PROJECT_ID: 使用しているプロジェクトの ID
- BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID
パイプラインを開始する
このパイプラインは、次の手順で変更ストリームを変換します。
- 変更ストリームを読み取る
- 曲名を取得する
- 曲のリッスン イベントを N 秒のウィンドウにグループ化する
- トップ 5 の曲をカウントする
- 結果を出力する
パイプラインを実行します。
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(us-east5
など)に置き換えます。
パイプラインを理解する
パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。
変更ストリームの読み取り
このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。
曲名の取得
曲が再生されると、曲名は列ファミリー cf
と列修飾子 song
に書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。
トップ 5 の曲のカウント
組み込みの Beam 関数 Count
と Top.of
を使用して、現在のウィンドウで上位 5 つの曲を取得できます。
結果の出力
このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。
パイプラインを表示する
Google Cloud コンソールの [Dataflow] ページに移動します。
song-rank で始まる名前のジョブをクリックします。
画面下部の [表示] をクリックして、ログパネルを開きます。
[ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。
ストリームの書き込み
cbt
CLI を使用して、さまざまなユーザーの曲の再生回数を song-rank
テーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
出力を表示する
最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。
gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
出力例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
バケットとファイルを削除します。
gcloud storage rm --recursive gs://BUCKET_NAME/
テーブルで変更ストリームを無効にします。
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
テーブル
song-rank
を削除します。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
変更ストリーム パイプラインを停止します。
ジョブを一覧表示し、ジョブ ID を取得します。
gcloud dataflow jobs list --region=BIGTABLE_REGION
ジョブをキャンセルします。
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
JOB_ID は、前のコマンドの後に表示されたジョブ ID に置き換えます。