本教學課程說明如何將資料管道部署至 Dataflow,以取得來自 Bigtable 資料表變更串流的資料庫變更即時串流。管道的輸出內容會寫入 Cloud Storage 上的一系列檔案。
我們提供音樂收聽應用程式的範例資料集。在本教學課程中,您會追蹤聆聽的歌曲,然後在一段時間內列出前五名。
本教學課程適用於熟悉程式碼編寫,以及將資料管道部署至 Google Cloud的技術人員。
目標
本教學課程將說明如何執行下列操作:
- 建立啟用變更串流的 Bigtable 資料表。
- 在 Dataflow 上部署管道,轉換並輸出變更串流。
- 查看資料管道的結果。
費用
在本文件中,您會使用 Google Cloud的下列計費元件:
如要根據預測用量估算費用,請使用 Pricing Calculator。
完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。
事前準備
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - 更新並安裝
cbt
CLI 。gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket and configure it as follows:
-
將
STORAGE_CLASS
替換成您偏好的儲存空間級別。 -
將
LOCATION
替換成您偏好的位置 (ASIA
、EU
或US
) -
將
BUCKET_NAME
替換成 符合值區命名規定的值區名稱。 - PROJECT_ID:您使用的專案 ID
- BIGTABLE_INSTANCE_ID:要包含新資料表的執行個體 ID
- 讀取變更串流
- 取得歌曲名稱
- 將歌曲聆聽事件分組到 N 秒的時間範圍內
- 計算前五名歌曲
- 輸出結果
前往 Google Cloud 控制台的「Dataflow」頁面。
按一下名稱開頭為 song-rank 的工作。
按一下畫面底部的「顯示」,開啟記錄面板。
按一下「Worker logs」(工作人員記錄),即可監控變更串流的輸出記錄。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STORAGE_CLASS --location LOCATION
建立 Bigtable 執行個體
在本教學課程中,您可以使用現有執行個體,或在您附近的區域建立執行個體,並採用預設設定。
建立資料表
這個範例應用程式會追蹤使用者收聽的歌曲,並將收聽事件儲存在 Bigtable 中。建立啟用變更串流的資料表,其中包含一個資料欄系列 (cf) 和一個資料欄 (song),並使用使用者 ID 做為資料列索引鍵。
建立表格。
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
更改下列內容:
啟動管道
這個管道會執行下列動作,轉換變更串流:
執行管道。
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
請將 BIGTABLE_REGION 改成 Bigtable 執行個體所在的區域 ID,例如
us-east5
。瞭解管道
管道中的下列程式碼片段可協助您瞭解執行的程式碼。
讀取變更串流
這個範例中的程式碼會使用特定 Bigtable 執行個體和資料表的參數,設定來源串流。
取得歌曲名稱
當使用者聆聽歌曲時,系統會將歌曲名稱寫入欄系列
cf
和欄限定符song
,因此程式碼會從變更串流突變中擷取值,並輸出至管道的下一個步驟。計算前五名歌曲
您可以使用內建的 Beam 函式
Count
和Top.of
,取得目前視窗中排名前五的歌曲。輸出結果
這個管道會將結果寫入標準輸出內容和檔案。如果是檔案,系統會將寫入作業分組,每組 10 個元素或 1 分鐘的區段。
查看管道
串流寫入
使用
cbt
CLI 將多位使用者收聽歌曲的次數寫入song-rank
資料表。這項設計會在幾分鐘內完成寫入,模擬歌曲收聽串流隨時間累積的狀況。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1
查看輸出內容
讀取 Cloud Storage 的輸出內容,查看最熱門的歌曲。
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
輸出內容範例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
-
將
刪除 bucket 和檔案。
gcloud storage rm --recursive gs://BUCKET_NAME/
停用資料表的變更串流。
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
刪除資料表
song-rank
。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
停止變更串流管道。
列出工作以取得工作 ID。
gcloud dataflow jobs list --region=BIGTABLE_REGION
取消工作。
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
將 JOB_ID 換成上一個指令顯示的工作 ID。
安裝 Google Cloud CLI。 安裝完成後,執行下列指令初始化 Google Cloud CLI:
gcloud init
如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI。
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
安裝 Google Cloud CLI。 安裝完成後,執行下列指令初始化 Google Cloud CLI:
gcloud init
如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI。
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
準備環境
取得程式碼
複製包含範例程式碼的存放區。如果先前已下載這個存放區,請提取最新版本。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
建立值區
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。
刪除專案
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID