DAG 疑難排解

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面提供常見工作流程問題的疑難排解步驟和資訊。

許多 DAG 執行問題都是環境效能不佳所致。 您可以按照「最佳化調整環境效能和成本效益」指南,將環境調整至最佳狀態。

部分 DAG 執行問題可能是 Airflow 排程器無法正常或最佳運作所致。請按照排程器疑難排解說明解決這些問題。

排解工作流程問題

如何開始進行疑難排解:

  1. 查看 Airflow 記錄

    您可以覆寫下列 Airflow 設定選項,提高 Airflow 的記錄層級。

    區段
    logging logging_level 預設值為 INFO。設為 DEBUG 可在記錄訊息中取得更多詳細資訊。
  2. 查看監控資訊主頁

  3. 查看 Cloud Monitoring

  4. 在 Google Cloud 控制台中,前往環境的元件頁面查看是否有錯誤。

  5. Airflow 網頁介面中查看 DAG 的圖表檢視,檢查是否有失敗的工作執行個體。

    區段
    webserver dag_orientation LRTBRLBT

針對運算子錯誤進行偵錯

如何針對運算子錯誤進行偵錯:

  1. 檢查是否有與特定工作相關的錯誤
  2. 查看 Airflow 記錄
  3. 查看 Cloud Monitoring
  4. 查看運算子專屬記錄。
  5. 修正錯誤。
  6. 將 DAG 上傳/dags 資料夾。
  7. 在 Airflow 網頁介面中,清除 DAG 的狀態記錄
  8. 繼續或執行 DAG。

排解工作執行問題

Airflow 是分散式系統,包含許多實體 (例如排程器、執行器、工作站),這些實體會透過工作佇列和 Airflow 資料庫彼此通訊,並傳送信號 (例如 SIGTERM)。下圖顯示 Airflow 元件之間的互連概況。

Airflow 元件之間的互動
圖 1. Airflow 元件之間的互動 (按一下可放大)

在 Airflow 這類分散式系統中,可能會發生網路連線問題,或基礎架構可能發生間歇性問題;這可能會導致工作失敗並重新排定執行時間,或工作可能無法順利完成 (例如殭屍工作,或卡在執行中的工作)。Airflow 具有處理這類情況的機制,可自動恢復正常運作。以下各節說明 Airflow 執行工作時發生的常見問題:殭屍工作、終止執行個體和 SIGTERM 訊號。

排解無效工作問題

Airflow 會偵測工作與執行工作的程序之間有以下兩種不符情況:

  • 無效工作是指應該執行但未執行的工作。如果工作程序已終止或沒有回應、Airflow 工作人員因負載過重而未及時回報工作狀態,或是執行工作的 VM 已關閉,就可能發生這種情況。Airflow 會定期尋找這類工作,並視工作設定而定,讓工作失敗或重試。

    找出無效工作

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • 殭屍工作是指不應執行的工作。Airflow 會定期找出這類工作並終止。

以下各節說明殭屍工作最常見的原因和解決方案。

Airflow 工作站記憶體不足

每個 Airflow 工作人員最多可同時執行 [celery]worker_concurrency 個工作執行個體。如果這些工作執行個體的累計記憶體用量超過 Airflow 工作站的記憶體限制,系統會終止工作站上的隨機程序,以釋出資源。

有時,Airflow 工作人員的記憶體不足,可能會導致在 SQL Alchemy 工作階段期間,傳送至資料庫、DNS 伺服器或 DAG 呼叫的任何其他服務的封包格式錯誤。在這種情況下,連線的另一端可能會拒絕或捨棄來自 Airflow 工作人員的連線。例如:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

解決方法

Airflow 工作站已遭撤銷

在 Kubernetes 上執行工作負載時,Pod 逐出是正常現象。如果 Pod 儲存空間不足,或為了優先順序較高的工作負載釋出資源,GKE 就會逐出 Pod。

解決方法

Airflow 工作人員已終止

Airflow 工作站可能會從外部移除。如果目前執行的工作在正常終止期間未完成,就會遭到中斷,且可能最終會被偵測為殭屍。

可能的情況和解決方案

  • 修改環境時 (例如升級或安裝套件),系統會重新啟動 Airflow 工作站:

    探索 Composer 環境修改內容

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    您可以在沒有執行重要工作時執行這類作業,或啟用工作重試功能。

  • 維護作業期間,各種元件可能暫時無法使用。

    您可以指定維護期間,盡量減少

    與重要工作執行作業重疊。

Airflow 工作站負載過高

Airflow 工作站可用的 CPU 和記憶體資源量受環境設定限制。如果資源用量接近上限,可能會導致資源爭用,並在工作執行期間發生不必要的延遲。在極端情況下,如果資源長期不足,可能會導致殭屍工作。

解決方法

Airflow 資料庫負載過重

各種 Airflow 元件會使用資料庫彼此通訊,特別是儲存工作執行個體的脈衝訊號。資料庫資源不足會導致查詢時間變長,並可能影響工作執行。

有時,Airflow 工作站的記錄檔會出現下列錯誤:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

解決方法

Airflow 資料庫暫時無法使用

Airflow 工作站可能需要時間偵測並妥善處理間歇性錯誤,例如暫時性連線問題。這可能會超過預設的僵屍偵測門檻。

瞭解 Airflow 活動訊號逾時

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

解決方法

  • 增加無效工作的逾時時間,並覆寫 Airflow 設定選項 [scheduler]scheduler_zombie_task_threshold 的值:

    區段 附註
    scheduler scheduler_zombie_task_threshold 新逾時時間 (秒) 預設值為 300

排解終止執行個體的問題

Airflow 會使用終止執行個體機制關閉 Airflow 工作。 這個機制適用於下列情況:

  • 排程器終止未準時完成的工作。
  • 工作逾時或執行時間過長。

Airflow 終止工作例項時,您可以在執行該工作的 Airflow 工作站記錄中,看到下列記錄項目:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

可能的解決方案:

  • 檢查工作程式碼是否有錯誤,導致工作執行時間過長。

  • 增加 Airflow 工作站的 CPU 和記憶體,加快工作執行速度。

  • 調高 [celery_broker_transport_options]visibility_timeout Airflow 設定選項的值。

    因此,排程器會等待較長的時間,才會將工作視為殭屍工作。如果工作耗時且持續數小時,這個選項就特別實用。如果值太低 (例如 3 小時),排程器會將執行 5 或 6 小時的工作視為「停止回應」(殭屍工作)。

  • 提高 [core]killed_task_cleanup_time Airflow 設定選項的值。

    值越大,Airflow 工作站完成工作所需的時間就越長。如果值太低,Airflow 工作可能會突然中斷,沒有足夠時間順利完成工作。

排解 SIGTERM 信號問題

Linux、Kubernetes、Airflow 排程器和 Celery 會使用 SIGTERM 信號,終止負責執行 Airflow 工作站或 Airflow 工作的程序。

環境中傳送 SIGTERM 信號的原因可能有很多:

  • 工作已成為無效工作,必須停止。

  • 排程器發現重複的工作,並將「終止執行個體」和 SIGTERM 信號傳送至工作,以停止工作。

  • 水平 Pod 自動調度資源中,GKE 控制層會傳送 SIGTERM 訊號,移除不再需要的 Pod。

  • 排程器可以將 SIGTERM 信號傳送至 DagFileProcessorManager 程序。排程器會使用這類 SIGTERM 信號管理 DagFileProcessorManager 程序生命週期,因此可以放心忽略。

    範例:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • local_task_job 中,心跳回呼與結束回呼之間的競爭情況,會監控工作執行作業。如果心跳偵測到工作標示為成功,則無法區分工作本身是否成功,或是 Airflow 收到指令,將工作視為成功。不過,這會終止工作執行器,不會等待執行器結束。

    您可以放心忽略這類 SIGTERM 信號。工作已處於成功狀態,DAG 執行作業整體不會受到影響。

    一般結束與成功狀態的工作終止之間,唯一的差異在於記錄項目 Received SIGTERM.

    活動訊號和結束回呼之間的競爭情況
    圖 2. 心跳和結束回呼之間的競爭條件 (按一下可放大)
  • Airflow 元件使用的資源 (CPU、記憶體) 超出叢集節點允許的範圍。

  • GKE 服務會執行維護作業,並將 SIGTERM 信號傳送至即將升級的節點上執行的 Pod。

    以 SIGTERM 終止工作執行個體時,您可以在執行工作的 Airflow 工作站記錄中,看到下列記錄項目:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

可能的解決方案:

如果執行工作的 VM 記憶體不足,就會發生這個問題。這與 Airflow 設定無關,而是與 VM 可用的記憶體容量有關。

  • 在 Cloud Composer 3 中,您可以為 Airflow 工作站指派更多 CPU 和記憶體資源。

  • 您可以降低 [celery]worker_concurrency concurrency Airflow 設定選項的值。這個選項會決定特定 Airflow 工作站要同時執行的工作數量。

如要進一步瞭解如何最佳化調整環境,請參閱「最佳化調整環境效能和成本效益」。

更新或升級作業對 Airflow 工作執行的影響

更新或升級作業會中斷目前執行的 Airflow 工作,除非工作是以可延遲模式執行。

建議您在預期對 Airflow 工作執行作業的影響最小時執行這些作業,並在 DAG 和工作中設定適當的重試機制。

排解 KubernetesExecutor 工作問題

CeleryKubernetesExecutor 是 Cloud Composer 3 中的一種執行器,可同時使用 CeleryExecutor 和 KubernetesExecutor。

如要進一步瞭解如何排解透過 KubernetesExecutor 執行的工作問題,請參閱「使用 CeleryKubernetesExecutor」頁面。

常見問題

以下各節說明一些常見 DAG 問題的徵兆和可能修正方式。

Airflow 工作遭 Negsignal.SIGKILL 中斷

有時工作使用的記憶體可能超出 Airflow 工作站的分配量。 在這種情況下,Negsignal.SIGKILL 可能會中斷作業。系統會傳送這項信號,避免進一步耗用記憶體,進而影響其他 Airflow 工作的執行。在 Airflow 工作站的記錄中,您可能會看到下列記錄項目:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL 也可能顯示為程式碼 -9

可能的解決方案:

  • 降低 Airflow 工作站的 worker_concurrency

  • 增加 Airflow 工作站可用的記憶體量。

  • 在 Cloud Composer 中管理耗用大量資源的工作時,可以使用 KubernetesPodOperatorGKEStartPodOperator 隔離工作,並自訂資源分配。

  • 將工作最佳化,減少記憶體用量。

工作因 DAG 剖析錯誤而失敗,但未產生記錄

有時 DAG 可能會發生細微錯誤,導致 Airflow 排程器可以排定任務執行作業、DAG 處理器可以剖析 DAG 檔案,但 Airflow 工作站無法執行 DAG 中的任務,因為 DAG 檔案有程式設計錯誤。這可能會導致 Airflow 工作標示為 Failed,但沒有執行記錄。

解決方法:

  • 在 Airflow 工作站記錄檔中,確認 Airflow 工作站未因 DAG 遺失或 DAG 剖析錯誤而引發任何錯誤。

  • 增加與 DAG 剖析相關的參數:

    • 將 [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] 增加至至少 120 秒 (或視需要增加)。

    • dag-file-processor-timeout 至少增加至 180 秒 (或視需要增加)。這個值必須大於 dagbag-import-timeout

  • 另請參閱「排解 DAG 處理器問題」。

工作因資源壓力而失敗,但未產生記錄

症狀:執行工作期間,負責執行 Airflow 工作的 Airflow 工作站子程序突然中斷。Airflow 工作人員記錄中顯示的錯誤可能類似以下內容:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解決方法:

工作因 Pod 移出而失敗,但未產生記錄

Google Kubernetes Engine Pod 須遵守 Kubernetes Pod 生命週期和 Pod 逐出程序。工作量暴增是 Cloud Composer 中 Pod 遭到驅逐的最常見原因。

如果特定 Pod 相對節點的設定資源耗用預期值,過度使用節點資源,就可能遭到驅逐。舉例來說,如果 Pod 中執行多項耗用大量記憶體的工作,且這些工作加總的負載導致 Pod 執行的節點超出記憶體耗用量上限,就可能發生驅逐作業。

如果 Airflow 工作站 Pod 遭到驅逐,在該 Pod 上執行的所有工作例項都會中斷,稍後 Airflow 會將這些例項標示為失敗。

記錄會經過緩衝處理。如果工作站 Pod 在緩衝區清除前遭到移除,就不會產生記錄。如果工作失敗但未產生記錄,表示 Airflow 工作站因記憶體不足 (OOM) 而重新啟動。即使未發出 Airflow 記錄,Cloud Logging 中仍可能存在部分記錄。

如要查看記錄:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 前往「記錄」分頁。

  4. 如要查看個別 Airflow 工作人員的記錄檔,請依序點選「所有記錄」>「Airflow 記錄」>「工作人員」

解決方法:

  • 提高 Airflow 工作站的記憶體限制

  • 確認 DAG 中的工作皆為冪等且可重試。

  • 避免將不必要的檔案下載至 Airflow 工作站的本機檔案系統。

    Airflow 工作人員的本機檔案系統容量有限。 Airflow 工作站的儲存空間可介於 1 GB 到 10 GB 之間。 儲存空間用盡時,GKE 控制層會逐出 Airflow 工作站 Pod。這會導致遭逐出的工作站執行所有工作失敗。

    有問題的作業範例:

    • 下載檔案或物件,並在本機 Airflow 工作人員中儲存。請改為直接將這些物件儲存在合適的服務中,例如 Cloud Storage 值區。
    • 從 Airflow 工作站存取 /data 資料夾中的大型物件。Airflow 工作站會將物件下載至本機檔案系統。 請改為實作 DAG,以便在 Airflow 工作人員 Pod 外部處理大型檔案。

DAG 執行作業未在預期時間內結束

症狀:

有時 DAG 執行作業不會結束,因為 Airflow 工作會停滯,且 DAG 執行作業持續時間超出預期。在正常情況下,Airflow 工作不會無限期處於佇列或執行狀態,因為 Airflow 具有逾時和清除程序,可避免這種情況。

修正方式:

  • 請使用 DAG 的 dagrun_timeout 參數。例如:dagrun_timeout=timedelta(minutes=120)。因此,每個 DAG 執行作業都必須在 DAG 執行作業逾時前完成。如要進一步瞭解 Airflow 工作狀態,請參閱 Apache Airflow 說明文件

  • 使用「工作執行逾時」參數,為根據 Apache Airflow 運算子執行的工作定義預設逾時時間。

進出 Airflow 資料庫的網路流量增加

環境的 GKE 叢集與 Airflow 資料庫之間的網路流量大小,取決於 DAG 數量、DAG 中的工作數量,以及 DAG 存取 Airflow 資料庫中資料的方式。下列因素可能會影響網路用量:

  • 查詢 Airflow 資料庫。如果 DAG 執行大量查詢,就會產生大量流量。例如:先檢查工作狀態,再繼續執行其他工作、查詢 XCom 表格、傾印 Airflow 資料庫內容。

  • 大量工作。排程的工作越多,產生的網路流量就越多。這項考量因素適用於 DAG 中的工作總數和排程頻率。Airflow 排程器排定 DAG 執行作業時,會查詢 Airflow 資料庫並產生流量。

  • Airflow 網頁介面會查詢 Airflow 資料庫,因此產生網路流量。大量使用含有圖表、工作和圖解的頁面,可能會產生大量網路流量。

請勿同時排定以程式輔助方式產生的 DAG

從 DAG 檔案以程式輔助方式產生 DAG 物件,是撰寫許多只有些微差異的類似 DAG 的有效方法。

請務必不要立即排定所有這類 DAG 的執行時間。Airflow 工作站很可能沒有足夠的 CPU 和記憶體資源,可同時執行所有排定的工作。

如要避免以程式輔助方式排定 DAG 時發生問題,請採取下列做法:

  • 提高工作站並行數,擴充環境,以便同時執行更多工作。
  • 產生 DAG 時,請確保排程平均分配在一段時間內,避免同時排定數百項工作,讓 Airflow 工作站有時間執行所有排定的工作。

存取 Airflow 網路伺服器時發生錯誤 504

請參閱「存取 Airflow UI 時發生錯誤 504」。

在工作執行期間或執行完畢後,查詢例外狀況會擲回與 Postgres 伺服器的連線中斷

如果符合下列條件,通常會發生 Lost connection to Postgres server during query 例外狀況:

  • 您的 DAG 使用 PythonOperator 或自訂運算子。
  • 您的 DAG 會查詢 Airflow 資料庫。

如果從可呼叫函式發出多個查詢,回溯可能會錯誤地指向 Airflow 程式碼中的 self.refresh_from_db(lock_for_update=True) 行;這是工作執行後的第一次資料庫查詢。例外狀況的實際原因發生在此之前,也就是 SQLAlchemy 工作階段未正確關閉時。

SQLAlchemy 工作階段的範圍限定為執行緒,且是在可呼叫函式中建立,工作階段稍後可在 Airflow 程式碼中繼續。如果單一工作階段內的查詢之間有明顯延遲,Postgres 伺服器可能已關閉連線。Cloud Composer 環境中的連線逾時時間設為大約 10 分鐘。

解決方法:

  • 使用 airflow.utils.db.provide_session 修飾符。這個裝飾器會在 session 參數中提供有效的 Airflow 資料庫工作階段,並在函式結尾正確關閉工作階段。
  • 請勿使用單一長期執行的函式。請改為將所有資料庫查詢作業移至個別函式,這樣就會有多個函式使用 airflow.utils.db.provide_session 裝飾器。在這種情況下,系統會在擷取查詢結果後自動關閉工作階段。

控管 DAG、工作和相同 DAG 的平行執行作業的執行時間

如要控管特定 DAG 的單一 DAG 執行作業持續時間,可以使用 dagrun_timeout DAG 參數。舉例來說,如果您預期單一 DAG 執行作業 (無論執行作業是否成功完成) 的持續時間不得超過 1 小時,請將這個參數設為 3600 秒。

您也可以控管單一 Airflow 工作可持續的時間長度。如要這麼做,可以使用 execution_timeout

如要控管特定 DAG 的有效 DAG 執行次數,可以使用 [core]max-active-runs-per-dag Airflow 設定選項

如要確保在特定時間只執行一個 DAG 執行個體,請將 max-active-runs-per-dag 參數設為 1

連線至 Airflow 中繼資料資料庫時發生暫時性中斷

Cloud Composer 是在分散式基礎架構上執行,這表示偶爾可能會出現暫時性問題,導致 Airflow 工作執行中斷。

在這種情況下,您可能會在 Airflow 工作人員的記錄中看到下列錯誤訊息:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

如果 Cloud Composer 環境正在進行維護作業,也可能導致這類間歇性問題。

這類錯誤通常是間歇性的,如果 Airflow 工作是冪等的,且您已設定重試,就不會受到影響。您也可以考慮定義維護期間

造成這類錯誤的另一個可能原因是環境叢集缺少資源。在這種情況下,您可以按照「擴充環境」或「最佳化環境」的指示,擴充或最佳化環境。

DAG 執行作業標示為成功,但沒有執行的工作

如果 DAG 執行作業 execution_date 早於 DAG 的 start_date,您可能會看到沒有任何任務執行作業,但仍標示為成功的 DAG 執行作業。

DAG 執行作業成功,但未執行任何工作
圖 3. 成功的 DAG 執行作業,但未執行任何工作 (按一下可放大)

原因

這可能發生於下列情況:

  • DAG 的execution_datestart_date時區不同,就會導致不符。舉例來說,使用 pendulum.parse(...) 設定 start_date 時,可能會發生這種情況。

  • DAG 的 start_date 設為動態值,例如 airflow.utils.dates.days_ago(1)

解決方案

  • 確認 execution_datestart_date 使用相同的時區。

  • 指定靜態 start_date 並與 catchup=False 合併,以免 DAG 的開始日期是過去的日期。

Airflow 資料庫負載過重的症狀

詳情請參閱「Airflow 資料庫負載壓力過大的症狀」。

後續步驟