排解 Airflow 排程器問題

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面提供 Airflow 排程器和 DAG 處理器常見問題的疑難排解步驟和資訊。

找出問題來源

如要開始排解問題,請先判斷問題發生在:

  • 在 DAG 剖析期間,Airflow DAG 處理器會剖析 DAG
  • 執行期間,Airflow 排程器處理 DAG 時

如要進一步瞭解剖析時間和執行時間,請參閱「DAG 剖析時間和 DAG 執行時間的差異」。

檢查 DAG 處理問題

  1. 檢查 DAG 處理器記錄
  2. 檢查 DAG 剖析時間

監控執行中和已排入佇列的工作

如要檢查是否有工作停滯在佇列中,請按照下列步驟操作。

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 前往「監控」分頁。

  4. 在「監控」分頁中,查看「DAG 執行」部分的「Airflow 工作」圖表,找出可能的問題。Airflow 工作是指在 Airflow 中處於佇列狀態的工作,這些工作可前往 Celery 或 Kubernetes 執行器代理程式佇列。Celery 佇列中的工作是指已排入 Celery 代理程式佇列中的工作執行個體。

排解 DAG 剖析時的問題

以下各節說明 DAG 剖析時發生的一些常見問題,以及可能的修正方式。

工作數量和時間分布

如果同時排定大量 DAG 或工作,Airflow 可能會發生問題。如要避免預約問題,請採取下列做法:

  • 調整 DAG,減少任務數量並整合任務。
  • 調整 DAG 的排程間隔,讓 DAG 執行作業更平均地分配到各個時間點。

調整 Airflow 設定

Airflow 提供 Airflow 設定選項,可控管 Airflow 同時執行的工作和 DAG 數量。如要設定這些設定選項,請覆寫環境中的值。您也可以在 DAG 或工作層級設定部分值。

  • 工作人員並行

    [celery]worker_concurrency 參數可控管 Airflow 工作站可同時執行的工作數量上限。如果將這個參數的值乘以 Cloud Composer 環境中的 Airflow 工作站數量,即可得出環境在特定時間可執行的最大工作數量。這個數字受限於 [core]parallelism Airflow 設定選項,詳情請參閱下文。

    在 Cloud Composer 3 環境中,系統會根據工作站可容納的輕量型並行工作執行個體數量,自動計算 [celery]worker_concurrency 的預設值。也就是說,這個值取決於工作站資源限制。工作站並行值與環境中的工作站數量無關。

  • 有效 DAG 執行作業數量上限

    [core]max_active_runs_per_dag Airflow 設定選項可控制每個 DAG 的有效 DAG 執行數上限。如果達到這項限制,排程器就不會建立更多 DAG 執行作業。

    如果這個參數設定錯誤,排程器可能會因為無法在特定時間建立更多 DAG 執行個體,而限制 DAG 執行次數。

    您也可以使用 max_active_runs 參數,在 DAG 層級設定這個值。

  • 每個 DAG 的最多進行中工作數

    [core]max_active_tasks_per_dag Airflow 設定選項可控管每個 DAG 中可同時執行的工作執行個體數量上限。

    如果這個參數設定錯誤,您可能會遇到問題,因為在特定時間只能執行有限數量的 DAG 工作,導致單一 DAG 例項的執行速度緩慢。在這種情況下,您可以增加這個設定選項的值。

    您也可以使用 max_active_tasks 參數,在 DAG 層級設定這個值。

    您可以在工作層級使用 max_active_tis_per_dagmax_active_tis_per_dagrun 參數,控管每個 DAG 和 DAG 執行作業允許執行的特定工作 ID 執行個體數量。

  • 平行處理和集區大小

    [core]parallelism Airflow 設定選項會控管 Airflow 排程器在滿足這些工作的所有依附元件後,可在執行器的佇列中排定多少工作。

    這是整個 Airflow 設定的全域參數。

    工作會在集區中排隊並執行。Cloud Composer 環境只會使用一個集區。這個集區的大小會控管排程器在特定時間可將多少工作排入佇列以供執行。如果集區大小過小,即使尚未達到 [core]parallelism 設定選項和 [celery]worker_concurrency 設定選項乘以 Airflow 工作站數所定義的門檻,排程器也無法將工作排入佇列以供執行。

    您可以在 Airflow UI 中設定集區大小 (依序點選「選單」>「管理」>「集區」)。根據環境中預期的平行處理程度調整集區大小。

    通常 [core]parallelism 會設為工作站數量上限與 [celery]worker_concurrency 的乘積。

排解執行中和已排入佇列工作相關問題

以下各節說明一些常見的執行中和已排入佇列工作問題的徵兆和可能修正方式。

未執行 DAG 執行作業

症狀:

如果動態設定 DAG 的排程日期,可能會導致各種非預期的副作用。例如:

  • DAG 一律會在未來執行,且 DAG 絕不會執行。

  • 即使未執行,系統仍會將過去的 DAG 執行作業標示為已執行且成功。

詳情請參閱 Apache Airflow 說明文件

可能的解決方案:

  • 請按照 Apache Airflow 說明文件中的建議操作。

  • 為 DAG 設定靜態 start_date。您也可以使用 catchup=False 停用過去日期的 DAG 執行作業。

  • 除非您瞭解這種做法的副作用,否則請避免使用 datetime.now()days_ago(<number of days>)

使用 Airflow 排程器的 TimeTable 功能

Cloud Composer 3 不支援 Airflow 排程器的自訂外掛程式,包括在 DAG 中實作的時間表。外掛程式不會同步處理至環境中的排程器。

您仍可在 Cloud Composer 3 中使用內建時間表

避免在維護期間排定工作

您可以為環境定義維護期,這樣環境維護作業就不會與 DAG 執行時間重疊。只要可以接受部分工作可能會中斷並重試,您仍可在維護期間執行 DAG。如要進一步瞭解維護期間對環境的影響,請參閱「指定維護期間」。

在 DAG 中使用「wait_for_downstream」

如果您在 DAG 中將 wait_for_downstream 參數設為 True,則為了讓工作順利完成,該工作下游的所有工作也必須順利完成。也就是說,前一個 DAG 執行的工作可能會拖慢特定 DAG 執行作業的工作執行速度。詳情請參閱 Airflow 說明文件

如果工作排入佇列的時間過長,系統會取消並重新排定工作

如果 Airflow 工作在佇列中保留太久,排程器會在 [scheduler]task_queued_timeout Airflow 設定選項中設定的時間量經過後,重新排定工作執行時間。預設值為 2400

如要觀察這種情況的徵兆,可以查看佇列工作數量的圖表 (Cloud Composer UI 中的「監控」分頁),如果圖表中的尖峰在約兩小時內沒有下降,工作很可能會重新排程 (沒有記錄),接著排程器記錄中會出現「Adopted tasks were still pending ...」記錄項目。在這種情況下,您可能會在 Airflow 工作記錄中看到「找不到記錄檔...」訊息,因為工作未執行。

一般來說,這是預期行為,排定工作下次執行時,應會按照排程執行。如果在 Cloud Composer 環境中發現許多這類案例,可能表示環境中的 Airflow 工作站不足,無法處理所有排定的工作。

解決方法:如要解決這個問題,請確保 Airflow 工作人員一律有足夠容量來執行佇列中的工作。舉例來說,您可以增加工作站或 worker_concurrency 的數量。您也可以調整平行處理或集區,避免排隊工作超過容量。

Cloud Composer 處理 min_file_process_interval 參數的方式

Cloud Composer 會變更 Airflow 排程器使用 [scheduler]min_file_process_interval 的方式。

Airflow 排程器會在所有 DAG 排定後重新啟動,而 [scheduler]num_runs 參數會控管排程器執行這項操作的次數。當排程器達到 [scheduler]num_runs 排程迴圈時,系統會重新啟動排程器。排程器是無狀態元件,因此重新啟動是自動修復機制,可解決排程器可能發生的任何問題。[scheduler]num_runs 的預設值為 5000。

[scheduler]min_file_process_interval 可用於設定 DAG 剖析的頻率,但這個參數不得超過排程器在排定 DAG 時執行 [scheduler]num_runs 迴圈所需的時間。

達到 dagrun_timeout 後,將工作標示為失敗

如果 DAG 執行作業未在 dagrun_timeout (DAG 參數) 內完成,排程器會將未完成 (正在執行、已排定時間和已加入佇列) 的工作標示為失敗。

解決方法:

Airflow 資料庫負載過重的症狀

有時您可能會在 Airflow 工作站記錄中看到下列警告記錄項目:

psycopg2.OperationalError: connection to server at ... failed

這類錯誤或警告可能是 Airflow 資料庫的徵兆,因為開放連線數量或在同一時間執行的查詢數量過多,導致排程器或其他 Airflow 元件 (例如工作站、觸發器和網頁伺服器) 負載過重。

可能的解決方案:

網頁伺服器顯示「排程器似乎未執行」警告

排程器會定期向 Airflow 資料庫回報活動訊號。Airflow 網路伺服器會根據這項資訊,判斷排程器是否處於啟用狀態。

有時,如果排程器負載過重,可能無法每隔 [scheduler]scheduler_heartbeat_sec 回報心跳。

在這種情況下,Airflow 網路伺服器可能會顯示下列警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解決方案:

  • 增加排程器的 CPU 和記憶體資源。

  • 將 DAG 最佳化,加快剖析和排程速度,並減少耗用排程器資源。

  • 避免在 Airflow DAG 中使用全域變數。請改用環境變數Airflow 變數

  • 提高 [scheduler]scheduler_health_check_threshold Airflow 設定選項的值,讓網路伺服器等待更久的時間,再回報排程器無法使用。

排解回填 DAG 時遇到的問題

有時您可能想重新執行已執行的 DAG。您可以使用 Airflow CLI 指令執行這項操作,方法如下:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如要只重新執行特定 DAG 的失敗工作,也請使用 --rerun-failed-tasks 引數。

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。
  • START_DATE,並提供 start_date DAG 參數的值,格式為 YYYY-MM-DD
  • END_DATE,並提供 end_date DAG 參數的值,格式為 YYYY-MM-DD
  • DAG_NAME 替換為 DAG 的名稱。

有時,回填作業可能會產生死結情況,導致工作鎖定而無法回填。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情況下,您可以採取下列因應措施來解決死結問題:

  • 如要停用迷你排程器,請覆寫 [core]schedule_after_task_executionFalse

  • 針對較窄的日期範圍執行回填作業。舉例來說,您可以將 START_DATEEND_DATE 設為同一天,只指定 1 天的期間。

後續步驟