本教學課程會使用 Pub/Sub 訂閱項目到 BigQuery 範本,透過 Google Cloud 控制台或 Google Cloud CLI 建立及執行 Dataflow 範本作業。本教學課程將逐步說明串流管道範例,該管道會從 Pub/Sub 讀取以 JSON 編碼的訊息,並將這些訊息寫入 BigQuery 資料表。
串流分析和資料整合管道會使用 Pub/Sub 擷取及發布資料。Pub/Sub 可讓您建立事件生產者和消費者系統,也就是發布者和訂閱者。發布者會以非同步方式將事件傳送至 Pub/Sub 服務,而 Pub/Sub 會將事件傳送至所有需要做出反應的服務。
Dataflow 是一項全代管服務,可轉換及充實串流 (即時) 和批次模式的資料。這個服務提供簡化的管道開發環境,可使用 Apache Beam SDK 轉換輸入資料,然後輸出轉換後的資料。
這個工作流程的優點是,您可以使用 UDF 轉換訊息資料,再將資料寫入 BigQuery。
針對這個情境執行 Dataflow 管道之前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。
目標
- 建立 Pub/Sub 主題。
- 建立含有資料表和結構定義的 BigQuery 資料集。
- 使用 Google 提供的串流範本,透過 Dataflow 將 Pub/Sub 訂閱項目中的資料串流至 BigQuery。
費用
在本文件中,您會使用 Google Cloud的下列計費元件:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
如要根據預測用量估算費用,請使用 Pricing Calculator。
完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。
事前準備
本節說明如何選取專案、啟用 API,以及將適當的角色授予使用者帳戶和工作者服務帳戶。
控制台
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
如要完成本教學課程中的步驟,使用者帳戶必須具備「服務帳戶使用者」角色。Compute Engine 預設服務帳戶必須具備下列角色:Dataflow 工作者、Dataflow 管理員、Pub/Sub 編輯者、Storage 物件管理員和 BigQuery 資料編輯者。如要在 Google Cloud 控制台中新增必要角色,請按照下列步驟操作:
前往 Google Cloud 控制台的「IAM」頁面。
前往「IAM」頁面- 選取專案。
- 在包含您使用者帳戶的資料列中,按一下 「Edit principal」(編輯主體),然後按一下 「Add another role」(新增其他角色)。
- 在下拉式清單中,選取「服務帳戶使用者」角色。
- 在包含 Compute Engine 預設服務帳戶的資料列中,按一下 「Edit principal」(編輯主體),然後按一下 「Add another role」(新增其他角色)。
- 在下拉式清單中,選取「Dataflow Worker」(Dataflow 工作者) 角色。
針對「Dataflow 管理員」、「Pub/Sub 編輯者」、「Storage 物件管理員」和「BigQuery 資料編輯者」角色重複上述步驟,然後按一下「儲存」。
如要進一步瞭解如何授予角色,請參閱「使用控制台授予 IAM 角色」。
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
更改下列內容:
PROJECT_ID
:您的專案 ID。PROJECT_NUMBER
:您的專案編號。 如要找出專案編號,請使用gcloud projects describe
指令。SERVICE_ACCOUNT_ROLE
:每個角色。
建立 Cloud Storage 值區
首先,請使用 Google Cloud 控制台或 Google Cloud CLI 建立 Cloud Storage bucket。Dataflow 管道會將這個 bucket 做為暫時儲存位置。
控制台
在 Google Cloud 控制台,前往「Cloud Storage bucket」頁面。
點選「建立」。
在「建立 bucket」頁面的「為 bucket 命名」欄位中,輸入符合bucket 命名規定的名稱。 Cloud Storage 值區名稱不得重複。請勿選取其他選項。
點選「建立」。
gcloud
使用 gcloud storage buckets create
指令:
gcloud storage buckets create gs://BUCKET_NAME
將 BUCKET_NAME
替換為符合值區命名規定的 Cloud Storage 值區名稱。Cloud Storage bucket 名稱不得重複。
建立 Pub/Sub 主題和訂閱項目
建立 Pub/Sub 主題,然後建立該主題的訂閱項目。
控制台
如要建立主題,請完成下列步驟。
前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。
按一下「建立主題」。
在「主題 ID」欄位中,輸入主題的 ID。如要瞭解如何命名主題,請參閱主題或訂閱項目命名規範。
保留「新增預設訂閱項目」選項。 請勿選取其他選項。
點選「建立」。
- 在主題詳細資料頁面中,建立的訂閱項目名稱會列在「Subscription ID」(訂閱 ID) 下方。請記下這個值,以供後續步驟使用。
gcloud
如要建立主題,請執行 gcloud pubsub topics create
指令。如要瞭解如何命名訂閱項目,請參閱「主題或訂閱項目命名規範」。
gcloud pubsub topics create TOPIC_ID
將 TOPIC_ID
替換為 Pub/Sub 主題的名稱。
如要為主題建立訂閱項目,請執行 gcloud pubsub subscriptions create
指令:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
將 SUBSCRIPTION_ID
換成 Pub/Sub 訂閱名稱。
建立 BigQuery 資料表
在本步驟中,您將建立具有下列結構定義的 BigQuery 資料表:
資料欄名稱 | 資料類型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
如果沒有 BigQuery 資料集,請先建立一個。詳情請參閱建立資料集。接著建立新的空白資料表:
主控台
前往「BigQuery」頁面
在「Explorer」窗格中展開專案,然後選取資料集。
在「資料集」資訊部分,按一下
「建立資料表」。在「Create table from」(建立資料表來源) 清單中,選取「Empty table」(空白資料表)。
在「Table」(資料表) 方塊中,輸入資料表名稱。
在「Schema」(結構定義) 區段中,按一下「以文字形式編輯」。
貼上下列結構定義:
name:STRING, customer_id:INTEGER
點選「建立資料表」。
gcloud
使用 bq mk
指令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
更改下列內容:
PROJECT_ID
:您的專案 IDDATASET_NAME
:資料集名稱TABLE_NAME
:要建立的資料表名稱
執行管道
使用 Google 提供的「Pub/Sub 訂閱到 BigQuery」範本執行串流管道。管道會從 Pub/Sub 主題取得輸入資料,並將資料輸出至 BigQuery 資料集。
控制台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
按一下 [Create job from template] (利用範本建立工作)。
在「Job name」(工作名稱) 中輸入 Dataflow 工作的名稱。
在「區域端點」中,選取 Dataflow 工作的區域。
在「Dataflow template」(Dataflow 範本) 欄位中,選取「Pub/Sub Subscription to BigQuery」(Pub/Sub 訂閱項目到 BigQuery) 範本。
在「BigQuery output table」(BigQuery 輸出資料表) 中,選取「Browse」(瀏覽),然後選取 BigQuery 資料表。
在「Pub/Sub input subscription」(Pub/Sub 輸入訂閱項目) 清單中,選取 Pub/Sub 訂閱項目。
在「Temporary location」(臨時位置) 中輸入下列資訊:
gs://BUCKET_NAME/temp/
將
BUCKET_NAME
換成 Cloud Storage 值區名稱。temp
資料夾會儲存 Dataflow 工作的暫存檔案。按一下「Run Job」(執行工作)。
gcloud
如要在殼層或終端機中執行範本,請使用 gcloud dataflow jobs run
指令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
請替換下列變數:
JOB_NAME
:工作名稱DATAFLOW_REGION
:工作區域PROJECT_ID
:您的 Google Cloud 專案名稱SUBSCRIPTION_ID
:Pub/Sub 訂閱項目的名稱DATASET_NAME
:BigQuery 資料集名稱TABLE_NAME
:BigQuery 資料表的名稱
將訊息發布至 Pub/Sub
Dataflow 工作啟動後,您就可以將訊息發布至 Pub/Sub,管道會將訊息寫入 BigQuery。
控制台
在 Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題) 頁面。
在主題清單中,按一下主題名稱。
按一下「訊息」。
按一下「發布訊息」。
在「Number of messages」(訊息數量) 中輸入
10
。在「Message body」(訊息內文) 中輸入
{"name": "Alice", "customer_id": 1}
。按一下 [發布]。
gcloud
如要將訊息發布至主題,請使用 gcloud pubsub topics publish
指令。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
將 TOPIC_ID
替換為主題名稱。
查看結果
查看寫入 BigQuery 資料表的資料。資料最多可能需要一分鐘才會顯示在資料表中。
控制台
前往 Google Cloud 控制台的「BigQuery」頁面。
前往 BigQuery 頁面在查詢編輯器中執行下列查詢:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
請替換下列變數:
PROJECT_ID
:專案名稱 Google CloudDATASET_NAME
:BigQuery 資料集名稱TABLE_NAME
:BigQuery 資料表的名稱
gcloud
執行下列查詢,在 BigQuery 中查看結果:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
請替換下列變數:
PROJECT_ID
:專案名稱 Google CloudDATASET_NAME
:BigQuery 資料集名稱TABLE_NAME
:BigQuery 資料表的名稱
使用 UDF 轉換資料
本教學課程假設 Pub/Sub 訊息的格式為 JSON,且 BigQuery 資料表結構定義與 JSON 資料相符。
您可以選擇提供 JavaScript 使用者定義函式 (UDF),在資料寫入 BigQuery 前轉換資料。UDF 可以執行額外處理作業,例如篩選、移除個人識別資訊 (PII),或使用其他欄位擴充資料。
詳情請參閱「為 Dataflow 範本建立使用者定義函式」。
使用無效信件資料表
工作執行期間,管道可能無法將個別訊息寫入 BigQuery。可能發生的錯誤包括:
- 序列化錯誤,包括格式錯誤的 JSON。
- 類型轉換錯誤,是由於資料表結構定義與 JSON 資料不符所致。
- JSON 資料中不在表格結構定義中的額外欄位。
管道會將這些錯誤寫入 BigQuery 的無法傳送郵件的信件資料表。根據預設,管道會自動建立名為 TABLE_NAME_error_records
的死信資料表,其中 TABLE_NAME
是輸出資料表的名稱。如要使用其他名稱,請設定 outputDeadletterTable
範本參數。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。
刪除專案
如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud
主控台
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
刪除個別資源
如果之後想重複使用專案,可以保留專案,但刪除您在教學課程中建立的資源。
停止 Dataflow 管道
控制台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
按一下要停止的工作。
如要停止工作,工作狀態必須為「執行中」。
在工作詳細資料頁面中,按一下「停止」。
按一下「取消」。
如要確認所選動作,請按一下「停止工作」。
gcloud
如要取消 Dataflow 工作,請使用 gcloud dataflow jobs
指令。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
清除 Google Cloud 專案資源
控制台
刪除 Pub/Sub 主題和訂閱項目。
前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。
選取您建立的主題。
按一下「刪除」以永久刪除主題。
前往 Google Cloud 控制台的 Pub/Sub「Subscriptions」(訂閱項目) 頁面。
選取以主題建立的訂閱項目。
按一下「刪除」即可永久刪除訂閱項目。
刪除 BigQuery 資料表和資料集。
前往 Google Cloud 控制台的「BigQuery」頁面。
在「Explorer」面板中展開專案。
在要刪除的資料集旁邊,依序按一下「View actions」(查看動作)
和「delete」(刪除)。
刪除 Cloud Storage 值區。
在 Google Cloud 控制台,前往「Cloud Storage bucket」頁面。
選取要刪除的 bucket,依序點選
「Delete」(刪除),然後按照指示操作。
gcloud
如要刪除 Pub/Sub 訂閱項目和主題,請使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
指令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
如要刪除 BigQuery 資料表,請使用
bq rm
指令。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
刪除 BigQuery 資料集。單獨使用資料集不會產生任何費用。
bq rm -r -f -d PROJECT_ID:tutorial_dataset
如要刪除 Cloud Storage bucket 及其物件,請使用
gcloud storage rm
指令。單獨使用儲存空間不會產生任何費用。gcloud storage rm gs://BUCKET_NAME --recursive
撤銷憑證
控制台
如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。
- 前往 Google Cloud 控制台的「IAM」頁面。
選取專案、資料夾或機構。
找到包含要撤銷存取權主體的資料列。 在該列中,按一下
「Edit principal」(編輯主體)。針對您要撤銷的每個角色按一下「刪除」圖示
按鈕,然後按一下「儲存」。
gcloud
- 如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
後續步驟
- 使用 UDF 擴充 Dataflow 範本。
- 進一步瞭解如何使用 Dataflow 範本。
- 查看所有 Google 提供的範本。
- 請參閱建立及使用主題的 Pub/Sub 說明,以及如何建立提取訂閱項目。
- 請參閱這篇文章,瞭解如何使用 BigQuery 建立資料集。
- 瞭解 Pub/Sub 訂閱項目。
- 探索 Google Cloud 的參考架構、圖表和最佳做法。 歡迎瀏覽我們的雲端架構中心。