Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 環境中管理 DAG。
Cloud Composer 使用 Cloud Storage 值區來儲存 Cloud Composer 環境的 DAG。環境會將這個 bucket 中的 DAG 同步至 Airflow 元件,例如 Airflow 工作站和排程器。
事前準備
- Apache Airflow 不提供嚴密的 DAG 隔離機制,因此建議您將實際工作環境和測試環境分開,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。
- 請確認帳戶具備足夠的權限,可管理 DAG。
- DAG 變更會在 3 到 5 分鐘內傳播至 Airflow。您可以在 Airflow 網頁介面中查看工作狀態。
存取環境的值區
如要存取與環境相關聯的 bucket,請按照下列步驟操作:
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。
gcloud
gcloud CLI 提供個別指令,可在環境的 bucket 中新增和刪除 DAG。
如要與環境的值區互動,也可以使用 Google Cloud CLI。如要取得環境 bucket 的位址,請執行下列 gcloud CLI 指令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。
範例:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
建構 environments.get
API 要求。在 Environment 資源的 EnvironmentConfig 資源中,dagGcsPrefix
資源是環境的 bucket 位址。
範例:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
使用 google-auth 程式庫取得憑證,並使用 requests 程式庫呼叫 REST API。
新增或更新 DAG
如要新增或更新 DAG,請將 DAG 的 Python .py
檔案移至環境 bucket 中的 /dags
資料夾。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。按一下「上傳檔案」。然後使用瀏覽器的對話方塊選取 DAG 的 Python
.py
檔案,並確認。
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。LOCAL_FILE_TO_UPLOAD
是 DAG 的 Python.py
檔案。
範例:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
更新有進行中 DAG 執行作業的 DAG
如果您更新有作用中 DAG 執行作業的 DAG:
- 所有目前執行的工作都會使用原始 DAG 檔案完成。
- 所有已排定但目前未執行的工作,都會使用更新後的 DAG 檔案。
- 更新後的 DAG 檔案中不再出現的所有工作,都會標示為已移除。
更新經常執行的 DAG
上傳 DAG 檔案後,Airflow 需要一段時間載入該檔案並更新 DAG。如果 DAG 經常排定執行,建議您確認 DAG 使用的是更新後的 DAG 檔案版本。方法如下:
在 Airflow 使用者介面中暫停 DAG。
上傳更新後的 DAG 檔案。
等待 Airflow UI 顯示更新。這表示排程器已正確剖析 DAG,並在 Airflow 資料庫中更新。
如果 Airflow UI 顯示更新後的 DAG,這不代表 Airflow 工作人員擁有更新後的 DAG 檔案版本。這是因為排程器和工作站會獨立同步處理 DAG 檔案。
您可能需要延長等待時間,確保 DAG 檔案已與環境中的所有工作站同步。系統每分鐘會同步處理數次。在正常環境中,等待約 20 到 30 秒就足以讓所有工作站完成同步。
(選用) 如要確保所有工作站都使用新版 DAG 檔案,請檢查每個工作站的記錄。方法如下:
在 Google Cloud 控制台中,開啟環境的「記錄」分頁。
依序前往「Composer logs」(Composer 記錄) >「Infrastructure」(基礎架構) >「Cloud Storage sync」(Cloud Storage 同步) 項目,並檢查環境中每個工作人員的記錄。找出最近的
Syncing dags directory
記錄項目,該項目必須有您上傳新 DAG 檔案後的時間戳記。如果看到Finished syncing
項目,表示 DAG 已在這個工作站上成功同步。
取消暫停 DAG。
重新剖析 DAG
由於 DAG 儲存在環境的值區中,因此每個 DAG 會先同步至 DAG 處理器,然後 DAG 處理器會短暫延遲剖析 DAG。如果您手動重新剖析 DAG (例如透過 Airflow UI),DAG 處理器會重新剖析可用的目前 DAG 版本,但這可能不是您上傳至環境 Bucket 的最新 DAG 版本。
如果剖析時間過長,建議您只使用隨選重新剖析。舉例來說,如果環境中有大量檔案,或是 Airflow 設定選項中設定了較長的 DAG 剖析間隔,就可能發生這種情況。
刪除環境中的 DAG
如要刪除 DAG,請從環境值區的環境 /dags
資料夾中,移除 DAG 的 Python .py
檔案。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,找到環境名稱所在的資料列,然後按一下「DAGs folder」(DAG 資料夾) 欄中的「DAGs」(DAG) 連結。「Bucket details」(值區詳細資料) 頁面隨即開啟。顯示環境值區中
/dags
資料夾的內容。選取 DAG 檔案,按一下「刪除」,然後確認作業。
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。DAG_FILE
,並使用 DAG 的 Python.py
檔案。
範例:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
從 Airflow UI 移除 DAG
如要從 Airflow 網頁介面移除 DAG 的中繼資料,請按照下列步驟操作:
Airflow UI
- 前往環境的 Airflow UI。
- 如要刪除 DAG,請按一下「Delete DAG」。
gcloud
在 gcloud CLI 中執行下列指令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。DAG_NAME
是要刪除的 DAG 名稱。