Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會合併 BigQuery 公開資料集和儲存在 Cloud Storage 值區中的 CSV 檔案資料,然後執行 Dataproc Serverless 批次工作,處理合併後的資料。
本教學課程使用的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案包含 1997 年至 2021 年美國節日的日期和名稱。
我們想透過 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節當天的氣溫如何?」
目標
- 以預設設定建立 Cloud Composer 環境
- 建立空白的 BigQuery 資料集
- 建立新的 Cloud Storage bucket
- 建立並執行包含下列工作的 DAG:
- 將外部資料集從 Cloud Storage 載入 BigQuery
- 在 BigQuery 中聯結兩個資料集
- 執行資料分析 PySpark 工作
事前準備
啟用 API
啟用下列 API:
主控台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予權限
將下列角色和權限授予使用者帳戶:
授予BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) 角色,建立 BigQuery 資料集。授予「Storage 管理員」 (
roles/storage.admin
) 角色,建立 Cloud Storage bucket。
建立及準備 Cloud Composer 環境
建立 Cloud Composer 環境,並使用預設參數:
- 選擇美國境內的區域。
- 選擇最新的 Cloud Composer 版本。
將下列角色授予 Cloud Composer 環境中使用的服務帳戶,Airflow 工作者才能順利執行 DAG 工作:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - 服務帳戶使用者 (
roles/iam.serviceAccountUser
) - Dataproc 編輯器 (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery 使用者 (
建立相關資源
建立空白的 BigQuery 資料集,並使用下列參數:
- Name (名稱):
holiday_weather
- Region (區域):
US
- Name (名稱):
在
US
多區域中建立新的 Cloud Storage 值區。執行下列指令,在要執行 Dataproc Serverless 的區域中,對預設子網路啟用私人 Google 存取權,以滿足網路需求。建議使用與 Cloud Composer 環境相同的區域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
使用 Dataproc Serverless 處理資料
探索 PySpark 工作範例
以下程式碼是 PySpark 工作範例,可將攝氏溫度從十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。
將支援檔案上傳至 Cloud Storage
如要上傳 PySpark 檔案和儲存在 holidays.csv
中的資料集,請按照下列步驟操作:
將 data_analytics_process.py 儲存到本機。
將 holidays.csv 儲存到本機。
前往 Google Cloud 控制台的「Cloud Storage 瀏覽器」頁面:
按一下您稍早建立的 bucket 名稱。
在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取
data_analytics_process.py
和holidays.csv
,然後按一下「開啟」。
資料分析 DAG
探索範例 DAG
DAG 會使用多個運算子轉換及合併資料:
GCSToBigQueryOperator
會從 Cloud Storage 將 holidays.csv 檔案擷取到您先前建立的 BigQueryholidays_weather
資料集中的新資料表。DataprocCreateBatchOperator
會使用 Dataproc Serverless 建立及執行 PySpark 批次工作。BigQueryInsertJobOperator
會將 holidays.csv 中的資料,與 BigQuery 公開資料集 ghcn_d 中的氣象資料,依據「Date」欄位進行聯結。BigQueryInsertJobOperator
工作是使用 for 迴圈動態產生,這些工作位於TaskGroup
中,方便在 Airflow UI 的圖表檢視畫面中閱讀。
使用 Airflow UI 新增變數
在 Airflow 中,變數是儲存及擷取任意設定或設定的通用方式,可做為簡單的鍵值儲存空間。這個 DAG 會使用 Airflow 變數儲存通用值。如要將這些變數新增至環境,請按照下列步驟操作:
依序前往「管理」>「變數」。
新增下列變數:
gcp_project
:您的專案 ID。gcs_bucket
:您先前建立的值區名稱 (不含gs://
前置字串)。:您要執行 Dataproc 作業的區域,該作業必須符合 Dataproc Serverless 網路需求。
gce_region
這是您先前啟用私人 Google 存取權的區域。dataproc_service_account
:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。
將 DAG 上傳至環境的值區
Cloud Composer 會排程環境 bucket 中 /dags
資料夾內的 DAG。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:
在本機電腦上儲存 data_analytics_dag.py。
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單的「DAG folder」(DAG 資料夾) 欄中,按一下「DAGs」(DAG) 連結。系統會開啟環境的 DAG 資料夾。
按一下「上傳檔案」。
選取本機上的
data_analytics_dag.py
,然後按一下「開啟」。
觸發 DAG
在 Cloud Composer 環境中,按一下「DAGs」分頁標籤。
按一下 DAG ID
data_analytics_dag
。按一下「觸發 DAG」。
等待約五到十分鐘,直到看到綠色勾號,表示工作已順利完成。
驗證 DAG 是否成功
前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。
在「Explorer」面板中,按一下專案名稱。
按一下「
holidays_weather_joined
」。按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏溫度十分之一為單位。
按一下「
holidays_weather_normalized
」。按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏度為單位。
深入瞭解無伺服器型 Dataproc (選用)
您可以試用這個 DAG 的進階版本,瞭解更複雜的 PySpark 資料處理流程。請參閱 GitHub 上的「Dataproc extension for the Data Analytics Example」。
清除所用資源
刪除您為本教學課程建立的個別資源:
刪除 Cloud Composer 環境,包括手動刪除環境的值區。