Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面提供 Airflow 排程器和 DAG 處理器常見問題的疑難排解步驟和資訊。
找出問題來源
如要開始排解問題,請先判斷問題發生在:
- 在 DAG 剖析期間,Airflow DAG 處理器會剖析 DAG
- 執行期間,Airflow 排程器處理 DAG 時
如要進一步瞭解剖析時間和執行時間,請參閱「DAG 剖析時間和 DAG 執行時間的差異」。
檢查 DAG 處理問題
監控執行中和已排入佇列的工作
如要檢查是否有工作停滯在佇列中,請按照下列步驟操作。
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「監控」分頁。
在「監控」分頁中,查看「DAG 執行」部分的「Airflow 工作」圖表,找出可能的問題。Airflow 工作是指在 Airflow 中處於佇列狀態的工作,這些工作可前往 Celery 或 Kubernetes 執行器代理程式佇列。Celery 佇列中的工作是指已排入 Celery 代理程式佇列中的工作執行個體。
排解 DAG 剖析時的問題
以下各節說明 DAG 剖析時發生的一些常見問題,以及可能的修正方式。
Cloud Composer 1 和 Airflow 1 中的 DAG 剖析和排程
Airflow 2 大幅提升了 DAG 剖析效率。如果遇到與 DAG 剖析和排程相關的效能問題,建議遷移至 Airflow 2。
在 Cloud Composer 1 中,排程器會與其他 Cloud Composer 元件一起在叢集節點上執行。因此,與其他節點相比,個別叢集節點的負載可能會較高或較低。排程器的效能 (DAG 剖析和排程) 可能會因排程器執行的節點而異。此外,排程器執行的個別節點可能會因升級或維護作業而變更。Cloud Composer 2 已解決這項限制,您可以將 CPU 和記憶體資源分配給排程器,排程器的效能也不會受到叢集節點負載的影響。
工作數量和時間分布
如果同時排定大量 DAG 或工作,Airflow 可能會發生問題。如要避免預約問題,請採取下列做法:
- 調整 DAG,減少任務數量並整合任務。
- 調整 DAG 的排程間隔,讓 DAG 執行作業更平均地分配到各個時間點。
調整 Airflow 設定
Airflow 提供 Airflow 設定選項,可控管 Airflow 同時執行的工作和 DAG 數量。如要設定這些設定選項,請覆寫環境中的值。您也可以在 DAG 或工作層級設定部分值。
-
[celery]worker_concurrency
參數可控管 Airflow 工作站可同時執行的工作數量上限。如果將這個參數的值乘以 Cloud Composer 環境中的 Airflow 工作站數量,即可得出環境在特定時間可執行的最大工作數量。這個數字受限於[core]parallelism
Airflow 設定選項,詳情請參閱下文。 -
[core]max_active_runs_per_dag
Airflow 設定選項可控制每個 DAG 的有效 DAG 執行數上限。如果達到這項限制,排程器就不會建立更多 DAG 執行作業。如果這個參數設定錯誤,排程器可能會因為無法在特定時間建立更多 DAG 執行個體,而限制 DAG 執行次數。
您也可以使用
max_active_runs
參數,在 DAG 層級設定這個值。 -
[core]max_active_tasks_per_dag
Airflow 設定選項可控管每個 DAG 中可同時執行的工作執行個體數量上限。如果這個參數設定錯誤,您可能會遇到問題,因為在特定時間只能執行有限數量的 DAG 工作,導致單一 DAG 例項的執行速度緩慢。在這種情況下,您可以增加這個設定選項的值。
您也可以使用
max_active_tasks
參數,在 DAG 層級設定這個值。您可以在工作層級使用
max_active_tis_per_dag
和max_active_tis_per_dagrun
參數,控管每個 DAG 和 DAG 執行作業允許執行的特定工作 ID 執行個體數量。 平行處理和集區大小
[core]parallelism
Airflow 設定選項會控管 Airflow 排程器在滿足這些工作的所有依附元件後,可在執行器的佇列中排定多少工作。這是整個 Airflow 設定的全域參數。
工作會在集區中排隊並執行。Cloud Composer 環境只會使用一個集區。這個集區的大小會控管排程器在特定時間可將多少工作排入佇列以供執行。如果集區大小過小,即使尚未達到
[core]parallelism
設定選項和[celery]worker_concurrency
設定選項乘以 Airflow 工作站數所定義的門檻,排程器也無法將工作排入佇列以供執行。您可以在 Airflow UI 中設定集區大小 (依序點選「選單」>「管理」>「集區」)。根據環境中預期的平行處理程度調整集區大小。
通常
[core]parallelism
會設為工作站數量上限與[celery]worker_concurrency
的乘積。
排解執行中和已排入佇列工作相關問題
以下各節說明一些常見的執行中和已排入佇列工作問題的徵兆和可能修正方式。
未執行 DAG 執行作業
症狀:
如果動態設定 DAG 的排程日期,可能會導致各種非預期的副作用。例如:
DAG 一律會在未來執行,且 DAG 絕不會執行。
即使未執行,系統仍會將過去的 DAG 執行作業標示為已執行且成功。
詳情請參閱 Apache Airflow 說明文件。
可能的解決方案:
請按照 Apache Airflow 說明文件中的建議操作。
為 DAG 設定靜態
start_date
。您也可以使用catchup=False
停用過去日期的 DAG 執行作業。除非您瞭解這種做法的副作用,否則請避免使用
datetime.now()
或days_ago(<number of days>)
。
使用 Airflow 排程器的 TimeTable 功能
Airflow 2.2 以上版本提供時間表。
您可以透過下列任一方法,為 DAG 定義時間表:
- 使用 Python 函式
- (Cloud Composer 1 不支援此功能) 使用自訂外掛程式
你也可以使用內建時間表。
叢集資源有限
如果環境的 GKE 叢集太小,無法處理所有 DAG 和工作,您可能會遇到效能問題。在這種情況下,請嘗試下列其中一種解決方案:
- 使用效能更高的機器類型建立新環境,然後將 DAG 遷移至該環境。
- 建立更多 Cloud Composer 環境,並在這些環境之間分割 DAG。
- 如要變更 GKE 節點的機器類型,請參閱「升級 GKE 節點的機器類型」。由於這個程序容易出錯,因此最不建議使用。
- 升級環境中執行 Airflow 資料庫的 Cloud SQL 執行個體機器類型,例如使用
gcloud composer environments update
指令。Airflow 資料庫效能不佳可能是排程器速度緩慢的原因。
避免在維護期間排定工作
您可以為環境定義維護期,這樣環境維護作業就不會與 DAG 執行時間重疊。只要可以接受部分工作可能會中斷並重試,您仍可在維護期間執行 DAG。如要進一步瞭解維護期間對環境的影響,請參閱「指定維護期間」。
在 DAG 中使用「wait_for_downstream」
如果您在 DAG 中將 wait_for_downstream
參數設為 True
,則為了讓工作順利完成,該工作下游的所有工作也必須順利完成。也就是說,前一個 DAG 執行的工作可能會拖慢特定 DAG 執行作業的工作執行速度。詳情請參閱 Airflow 說明文件。
如果工作排入佇列的時間過長,系統會取消並重新排定工作
如果 Airflow 工作在佇列中保留太久,排程器會在 [scheduler]task_queued_timeout
Airflow 設定選項中設定的時間量經過後,重新排定工作執行時間。預設值為 2400
。在 2.3.1 之前的 Airflow 版本中,如果工作符合重試資格,系統也會將工作標示為失敗並重試。
如要觀察這種情況的徵兆,可以查看佇列工作數量的圖表 (Cloud Composer UI 中的「監控」分頁),如果圖表中的尖峰在約兩小時內沒有下降,工作很可能會重新排程 (沒有記錄),接著排程器記錄中會出現「Adopted tasks were still pending ...」記錄項目。在這種情況下,您可能會在 Airflow 工作記錄中看到「找不到記錄檔...」訊息,因為工作未執行。
一般來說,這是預期行為,排定工作下次執行時,應會按照排程執行。如果在 Cloud Composer 環境中發現許多這類案例,可能表示環境中的 Airflow 工作站不足,無法處理所有排定的工作。
解決方法:如要解決這個問題,請確保 Airflow 工作人員一律有足夠容量來執行佇列中的工作。舉例來說,您可以增加工作站或 worker_concurrency 的數量。您也可以調整平行處理或集區,避免排隊工作超過容量。
Cloud Composer 處理 min_file_process_interval 參數的方式
Cloud Composer 會變更 Airflow 排程器使用 [scheduler]min_file_process_interval
的方式。
Airflow 1
如果 Cloud Composer 使用 Airflow 1,使用者可以將 [scheduler]min_file_process_interval
的值設為 0 到 600 秒。如果值超過 600 秒,效果與將 [scheduler]min_file_process_interval
設為 600 秒相同。
Airflow 2
在 1.19.9 之前的 Cloud Composer 版本中,系統會忽略 [scheduler]min_file_process_interval
。
Cloud Composer 1.19.9 以上版本:
Airflow 排程器會在所有 DAG 排定後重新啟動,而 [scheduler]num_runs
參數會控管排程器執行這項操作的次數。當排程器達到 [scheduler]num_runs
排程迴圈時,系統會重新啟動排程器。排程器是無狀態元件,因此重新啟動是自動修復機制,可解決排程器可能發生的任何問題。[scheduler]num_runs
的預設值為 5000。
[scheduler]min_file_process_interval
可用於設定 DAG 剖析的頻率,但這個參數不得超過排程器在排定 DAG 時執行 [scheduler]num_runs
迴圈所需的時間。
達到 dagrun_timeout 後,將工作標示為失敗
如果 DAG 執行作業未在 dagrun_timeout
(DAG 參數) 內完成,排程器會將未完成 (正在執行、已排定時間和已加入佇列) 的工作標示為失敗。
解決方法:
延長
dagrun_timeout
以符合逾時時間。
Airflow 資料庫負載過重的症狀
有時您可能會在 Airflow 排程器記錄中看到下列警告記錄項目:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Airflow 工作站記錄檔中也可能會出現類似症狀:
MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
這類錯誤或警告可能是 Airflow 資料庫的徵兆,因為開放連線數量或在同一時間執行的查詢數量過多,導致排程器或其他 Airflow 元件 (例如工作站、觸發器和網頁伺服器) 負載過重。
可能的解決方案:
如要擴充 Airflow 資料庫,請變更 Cloud SQL 執行個體的機器類型,該執行個體會儲存環境的 Airflow 資料庫。
避免在 Airflow DAG 中使用全域變數。請改用環境變數和 Airflow 變數。
將
[scheduler]scheduler_heartbeat_sec
設為較高的值,例如 15 秒以上。將
[scheduler]job_heartbeat_sec
設為較高的值,例如 30 秒以上。將
[scheduler]scheduler_health_check_threshold
設為等於[scheduler]job_heartbeat_sec
乘以4
的值。
網頁伺服器顯示「排程器似乎未執行」警告
排程器會定期向 Airflow 資料庫回報活動訊號。Airflow 網路伺服器會根據這項資訊,判斷排程器是否處於啟用狀態。
有時,如果排程器負載過重,可能無法每隔 [scheduler]scheduler_heartbeat_sec
回報心跳。
在這種情況下,Airflow 網路伺服器可能會顯示下列警告:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
可能的解決方案:
增加排程器的 CPU 和記憶體資源。
將 DAG 最佳化,加快剖析和排程速度,並減少耗用排程器資源。
避免在 Airflow DAG 中使用全域變數。請改用環境變數和 Airflow 變數。
提高
[scheduler]scheduler_health_check_threshold
Airflow 設定選項的值,讓網路伺服器等待更久的時間,再回報排程器無法使用。
排解回填 DAG 時遇到的問題
有時您可能想重新執行已執行的 DAG。您可以使用 Airflow CLI 指令執行這項操作,方法如下:
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如要只重新執行特定 DAG 的失敗工作,也請使用 --rerun-failed-tasks
引數。
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如要只重新執行特定 DAG 的失敗工作,也請使用 --rerun_failed_tasks
引數。
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。START_DATE
,並提供start_date
DAG 參數的值,格式為YYYY-MM-DD
。END_DATE
,並提供end_date
DAG 參數的值,格式為YYYY-MM-DD
。- 將
DAG_NAME
替換為 DAG 的名稱。
有時,回填作業可能會產生死結情況,導致工作鎖定而無法回填。例如:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
在某些情況下,您可以採取下列因應措施來解決死結問題:
如要停用迷你排程器,請覆寫
[core]schedule_after_task_execution
至False
。針對較窄的日期範圍執行回填作業。舉例來說,您可以將
START_DATE
和END_DATE
設為同一天,只指定 1 天的期間。