Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 環境中管理 DAG。
Cloud Composer 使用 Cloud Storage 值區來儲存 Cloud Composer 環境的 DAG。環境會將這個 bucket 中的 DAG 同步至 Airflow 元件,例如 Airflow 工作站和排程器。
事前準備
- Apache Airflow 不提供嚴密的 DAG 隔離機制,因此建議您將實際工作環境和測試環境分開,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。
- 請確認帳戶具備足夠的權限,可管理 DAG。
- DAG 變更會在 3 到 5 分鐘內傳播至 Airflow。您可以在 Airflow 網頁介面中查看工作狀態。
存取環境的值區
如要存取與環境相關聯的 bucket,請按照下列步驟操作:
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。
gcloud
gcloud CLI 提供個別指令,可在環境的 bucket 中新增和刪除 DAG。
如要與環境的值區互動,也可以使用 Google Cloud CLI。如要取得環境 bucket 的位址,請執行下列 gcloud CLI 指令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。
範例:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
建構 environments.get
API 要求。在 Environment 資源的 EnvironmentConfig 資源中,dagGcsPrefix
資源是環境的 bucket 位址。
範例:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
使用 google-auth 程式庫取得憑證,並使用 requests 程式庫呼叫 REST API。
新增或更新 DAG
如要新增或更新 DAG,請將 DAG 的 Python .py
檔案移至環境 bucket 中的 /dags
資料夾。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。按一下「上傳檔案」。然後使用瀏覽器的對話方塊選取 DAG 的 Python
.py
檔案,並確認。
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。LOCAL_FILE_TO_UPLOAD
是 DAG 的 Python.py
檔案。
範例:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
更新有進行中 DAG 執行作業的 DAG
如果您更新有作用中 DAG 執行作業的 DAG:
- 所有目前執行的工作都會使用原始 DAG 檔案完成。
- 所有已排定但目前未執行的工作,都會使用更新後的 DAG 檔案。
- 更新後的 DAG 檔案中不再出現的所有工作,都會標示為已移除。
更新經常執行的 DAG
上傳 DAG 檔案後,Airflow 需要一段時間載入該檔案並更新 DAG。如果 DAG 經常排定執行,建議您確認 DAG 使用的是更新後的 DAG 檔案版本。方法如下:
在 Airflow 使用者介面中暫停 DAG。
上傳更新後的 DAG 檔案。
等待 Airflow UI 顯示更新。這表示排程器已正確剖析 DAG,並在 Airflow 資料庫中更新。
如果 Airflow UI 顯示更新後的 DAG,這不代表 Airflow 工作人員擁有更新後的 DAG 檔案版本。這是因為排程器和工作站會獨立同步處理 DAG 檔案。
您可能需要延長等待時間,確保 DAG 檔案已與環境中的所有工作站同步。系統每分鐘會同步處理數次。在正常環境中,等待約 20 到 30 秒就足以讓所有工作站完成同步。
(選用) 如要確保所有工作站都使用新版 DAG 檔案,請檢查每個工作站的記錄。方法如下:
在 Google Cloud 控制台中,開啟環境的「記錄」分頁。
依序前往「Composer logs」(Composer 記錄) >「Infrastructure」(基礎架構) >「Cloud Storage sync」(Cloud Storage 同步) 項目,並檢查環境中每個工作人員的記錄。找出最近的
Syncing dags directory
記錄項目,該項目必須有您上傳新 DAG 檔案後的時間戳記。如果看到Finished syncing
項目,表示 DAG 已在這個工作站上成功同步。
取消暫停 DAG。
刪除環境中的 DAG
如要刪除 DAG,請從環境值區的環境 /dags
資料夾中,移除 DAG 的 Python .py
檔案。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。選取 DAG 檔案,按一下「刪除」,然後確認作業。
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。DAG_FILE
,並使用 DAG 的 Python.py
檔案。
範例:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
從 Airflow UI 移除 DAG
如要從 Airflow 網頁介面移除 DAG 的中繼資料,請按照下列步驟操作:
Airflow UI
- 前往環境的 Airflow UI。
- 如要刪除 DAG,請按一下「Delete DAG」。
gcloud
在 Airflow 1 版本 (1.14.0 之前的版本) 中,請在 gcloud CLI 中執行下列指令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
在 Airflow 2、Airflow 1.14.0 和更新版本中,請在 gcloud CLI 中執行下列指令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。DAG_NAME
是要刪除的 DAG 名稱。