Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程改編自「在 Cloud Composer 中執行資料分析 DAG Google Cloud」,說明如何將 Cloud Composer 環境連線至 Microsoft Azure,以使用儲存在該處的資料。說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會合併 BigQuery 公開資料集和儲存在 Azure Blob 儲存體中的 CSV 檔案資料,然後執行 Dataproc Serverless 批次工作,處理合併後的資料。
本教學課程使用的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案包含 1997 年至 2021 年美國節日的日期和名稱。
我們想透過 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節當天的氣溫如何?」
目標
- 以預設設定建立 Cloud Composer 環境
- 在 Azure 中建立 Blob
- 建立空白的 BigQuery 資料集
- 建立新的 Cloud Storage bucket
- 建立並執行包含下列工作的 DAG:
- 將外部資料集從 Azure Blob 儲存體載入 Cloud Storage
- 將外部資料集從 Cloud Storage 載入 BigQuery
- 在 BigQuery 中聯結兩個資料集
- 執行資料分析 PySpark 工作
事前準備
啟用 API
啟用下列 API:
主控台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予權限
將下列角色和權限授予使用者帳戶:
授予BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) 角色,建立 BigQuery 資料集。授予「Storage 管理員」 (
roles/storage.admin
) 角色,建立 Cloud Storage bucket。
建立及準備 Cloud Composer 環境
建立 Cloud Composer 環境,並使用預設參數:
- 選擇美國境內的區域。
- 選擇最新的 Cloud Composer 版本。
將下列角色授予 Cloud Composer 環境中使用的服務帳戶,Airflow 工作者才能順利執行 DAG 工作:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - 服務帳戶使用者 (
roles/iam.serviceAccountUser
) - Dataproc 編輯器 (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery 使用者 (
在 Google Cloud中建立及修改相關資源
在 Cloud Composer 環境中安裝
apache-airflow-providers-microsoft-azure
PyPI 套件。建立空白的 BigQuery 資料集,並使用下列參數:
- Name (名稱):
holiday_weather
- Region (區域):
US
- Name (名稱):
在
US
多區域中建立新的 Cloud Storage 值區。執行下列指令,在要執行 Dataproc Serverless 的區域中,對預設子網路啟用私人 Google 存取權,以滿足網路需求。建議使用與 Cloud Composer 環境相同的區域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 Azure 中建立相關資源
建立儲存空間帳戶, 並使用預設設定。
在新建的儲存空間帳戶中,使用預設選項建立容器。
授予儲存體 Blob 委派者角色,給上一步建立的容器。
將 holidays.csv 上傳至 Azure 入口網站,建立區塊 Blob 並採用預設選項。
在 Azure 入口網站中,為上一步建立的區塊 Blob 建立 SAS 權杖。
- 簽署方法:使用者委派金鑰
- 權限:讀取
- 允許的 IP 位址:無
- 允許的通訊協定:僅限 HTTPS
從 Cloud Composer 連線至 Azure
使用 Airflow UI 新增 Microsoft Azure連線:
依序前往「管理」>「連結」。
建立新連線並加入以下設定:
- 連線 ID:
azure_blob_connection
- 連線類型:
Azure Blob Storage
- Blob 儲存空間登入:您的儲存空間帳戶名稱
- Blob 儲存體金鑰:儲存體帳戶的存取金鑰
- Blob 儲存體帳戶連線字串:儲存體帳戶連線字串
- SAS 權杖:從 Blob 產生的 SAS 權杖
- 連線 ID:
使用 Dataproc Serverless 處理資料
探索 PySpark 工作範例
以下程式碼是 PySpark 工作範例,可將攝氏溫度從十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。
將 PySpark 檔案上傳至 Cloud Storage
如要將 PySpark 檔案上傳至 Cloud Storage,請按照下列步驟操作:
將 data_analytics_process.py 儲存到本機。
前往 Google Cloud 控制台的「Cloud Storage 瀏覽器」頁面:
按一下您稍早建立的 bucket 名稱。
在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取
data_analytics_process.py
,然後按一下「開啟」。
資料分析 DAG
探索範例 DAG
DAG 會使用多個運算子轉換及合併資料:
AzureBlobStorageToGCSOperator
會將 Azure 區塊 Blob 中的 holidays.csv 檔案轉移到 Cloud Storage bucket。GCSToBigQueryOperator
會從 Cloud Storage 將 holidays.csv 檔案擷取到您先前建立的 BigQueryholidays_weather
資料集中的新資料表。DataprocCreateBatchOperator
會使用 Dataproc Serverless 建立及執行 PySpark 批次工作。BigQueryInsertJobOperator
會將 holidays.csv 中的資料,與 BigQuery 公開資料集 ghcn_d 中的氣象資料,依據「Date」欄位進行聯結。BigQueryInsertJobOperator
工作是使用 for 迴圈動態產生,這些工作位於TaskGroup
中,方便在 Airflow UI 的圖表檢視畫面中閱讀。
使用 Airflow UI 新增變數
在 Airflow 中,變數是儲存及擷取任意設定或設定的通用方式,可做為簡單的鍵值儲存空間。這個 DAG 會使用 Airflow 變數儲存通用值。如要將這些變數新增至環境,請按照下列步驟操作:
依序前往「管理」>「變數」。
新增下列變數:
gcp_project
:您的專案 ID。gcs_bucket
:您先前建立的值區名稱 (不含gs://
前置字串)。:您要執行 Dataproc 作業的區域,該作業必須符合 Dataproc Serverless 網路需求。
gce_region
這是您先前啟用私人 Google 存取權的區域。dataproc_service_account
:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。azure_blob_name
:您先前建立的 Blob 名稱。azure_container_name
:您先前建立的容器名稱。
將 DAG 上傳至環境的值區
Cloud Composer 會排程環境 bucket 中 /dags
資料夾內的 DAG。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單的「DAG folder」(DAG 資料夾) 欄中,按一下「DAGs」(DAG) 連結。系統會開啟環境的 DAG 資料夾。
按一下「上傳檔案」。
選取本機上的
azureblobstoretogcsoperator_tutorial.py
,然後按一下「開啟」。
觸發 DAG
在 Cloud Composer 環境中,按一下「DAGs」分頁標籤。
按一下 DAG ID
azure_blob_to_gcs_dag
。按一下「觸發 DAG」。
等待約五到十分鐘,直到看到綠色勾號,表示工作已順利完成。
驗證 DAG 是否成功
前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。
在「Explorer」面板中,按一下專案名稱。
按一下「
holidays_weather_joined
」。按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏溫度十分之一為單位。
按一下「
holidays_weather_normalized
」。按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏度為單位。
清除所用資源
刪除您為本教學課程建立的個別資源:
刪除 Cloud Composer 環境,包括手動刪除環境的值區。