偵錯工作排程問題

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程將引導您診斷及排解工作排程和剖析問題,這些問題會導致排程器發生故障、剖析錯誤和延遲,以及工作失敗。

簡介

Airflow 排程器主要受到兩個因素的影響:工作排程和 DAG 剖析。其中一個因素出現問題,可能會對環境健康和效能造成負面影響。

有時會同時排定太多工作,在這種情況下,佇列會被填滿,工作會維持在「已排程」狀態,或是在排入佇列後重新排程,這可能會導致工作失敗和效能延遲。

另一個常見問題是剖析延遲和錯誤,這些問題是由於 DAG 程式碼複雜度過高所致。舉例來說,如果 DAG 程式碼在程式碼頂層包含 Airflow 變數,可能會導致剖析延遲、資料庫超載、排程失敗和 DAG 逾時。

在本教學課程中,您將診斷 DAG 範例,並瞭解如何排解排程和剖析問題、改善 DAG 排程,以及最佳化 DAG 程式碼和環境設定,以提升效能。

目標

本節列出本教學課程範例的目標。

範例:高工作負載並行性導致排程器異常運作和延遲

  • 上傳同時執行多次的 DAG 範例,並使用 Cloud Monitoring 診斷排程器異常和延遲問題。

  • 合併工作,並評估效能影響,以便最佳化 DAG 程式碼。

  • 在一段時間內更平均地分配工作,並評估成效影響。

  • 調整 Airflow 設定和環境設定,並評估影響。

範例:DAG 剖析錯誤和複雜程式碼造成的延遲

  • 上傳含有 Airflow 變數的 DAG 範例,並使用 Cloud Monitoring 診斷剖析問題。

  • 避免在程式碼頂層使用 Airflow 變數,藉此最佳化 DAG 程式碼,並評估對剖析時間的影響。

  • 最佳化 Airflow 設定和環境設定,並評估對剖析時間的影響。

費用

本教學課程使用 Google Cloud的下列計費元件:

完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱「清除所用資源」一節。

事前準備

本節說明開始教學課程前必須執行的動作。

建立及設定專案

本教學課程需要 Google Cloud 專案。請按照下列方式設定專案:

  1. 在 Google Cloud 控制台中選取或建立專案

    前往專案選取器

  2. 請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能

  3. 請確認 Google Cloud 專案使用者具備下列角色,以便建立必要資源:

    • 環境與 Storage 物件管理員 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

為專案啟用 API

Enable the Cloud Composer API.

Enable the API

建立 Cloud Composer 環境

建立 Cloud Composer 2 環境

建立環境時的預設預設值。

在建立環境的過程中,您會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在 Google Cloud 專案中執行作業。

範例:工作排程問題導致排程器運作異常,導致工作失敗

這個範例說明排程器發生故障和高工作並行性造成的延遲情形。

將範例 DAG 上傳至環境

上傳以下 DAG 範例至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 的名稱為 dag_10_tasks_200_seconds_1

這個 DAG 有 200 項工作。每個工作都會等待 1 秒,然後列印「Complete!」上傳後,系統會自動觸發 DAG。Cloud Composer 會執行這個 DAG 10 次,所有 DAG 執行作業都會以並行方式進行。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

診斷排程器運作異常和工作失敗問題

DAG 執行完畢後,請開啟 Airflow UI,然後按一下 dag_10_tasks_200_seconds_1 DAG。您會看到總共 10 個 DAG 執行作業成功,且每個 DAG 都有 200 個成功的任務。

查看 Airflow 工作記錄:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「Logs」分頁,然後依序前往「All logs」>「Airflow logs」>「Workers」>「View in Logs Explorer」

在記錄直方圖中,您可以看到以紅色和橘色標示的錯誤和警告:

Airflow 工作站記錄檔的直方圖,其中錯誤和警告以紅色和橘色標示
圖 1. Airflow 工作站記錄直方圖 (按一下可放大)

這個範例 DAG 會產生約 130 個警告和 60 個錯誤。按一下任何包含黃色和紅色長條的欄。您會在記錄中看到下列部分警告和錯誤:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

這些記錄可能表示資源用量超出限制,且 worker 自行重新啟動。

如果 Airflow 工作佇列中的任務保留太久,排程器會將其標示為失敗並標示為 up_for_retry,並重新排程以便執行。觀察這種情況的症狀之一,就是查看佇列工作數量的圖表,如果這張圖表中的尖峰值在約 10 分鐘內未下降,則工作可能會失敗 (且沒有記錄檔)。

查看監控資訊:

  1. 前往「監控」分頁,然後選取「總覽」

  2. 查看「Airflow 工作」圖表。

    Airflow 工作隨著時間變化的圖表,顯示佇列工作數量激增
    圖 2. Airflow 工作圖表 (按一下可放大)

    在 Airflow 工作流程圖中,佇列工作會出現長達 10 分鐘的尖峰,這可能表示環境中沒有足夠的資源來處理所有排定的工作。

  3. 查看「活躍的工作站數量」圖表:

    這張圖表顯示,隨著時間推移,運作中 Airflow 工作站的數量已達到上限
    圖 3. 活躍的工作站圖表 (按一下可放大)

    「有效工作站」圖表顯示,DAG 在執行期間觸發自動調度資源,並達到允許的最大工作站數量上限 (三個)。

  4. 資源使用率圖表可能會指出 Airflow 工作者缺乏執行排隊工作的能力。在「監控」分頁中,選取「Workers」,然後查看「Total worker CPU usage」和「Total worker memory usage」圖表。

    Airflow 工作站 CPU 使用率圖表顯示 CPU 使用率會增加至上限
    圖 4. 工作站 CPU 使用率總計圖表 (點選即可放大)
    Airflow 工作站記憶體使用率圖表顯示記憶體用量增加,但未達上限
    圖 5. 工作站記憶體總用量圖表 (按一下即可放大)

    圖表顯示同時執行太多工作導致 CPU 達到上限。資源已使用超過 30 分鐘,甚至比 10 個 DAG 逐一執行時的 200 項工作總時間還長。

這些指標表示佇列已滿,且缺乏處理所有排定工作所需的資源。

合併工作

目前的程式碼會建立許多 DAG 和工作,但沒有足夠的資源可同時處理所有工作,導致佇列已滿。如果工作佇列中的任務保留時間過長,可能會導致工作重新排程或失敗。在這種情況下,您應選擇較少且更整合的任務。

以下的 DAG 範例會將初始範例中的任務數量從 200 變更為 20,並將等待時間從 1 秒增加到 10 秒,模擬更多整合任務,以執行相同的工作量。

將下列 DAG 範例上傳至您建立的環境。在本教學課程中,這個 DAG 的名稱為 dag_10_tasks_20_seconds_10

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

評估更多整合工作對排程程序的影響:

  1. 等待 DAG 執行作業完成。

  2. 在 Airflow 使用者介面的「DAG」頁面中,按一下 dag_10_tasks_20_seconds_10 DAG。您會看到 10 個 DAG 執行作業,每個作業都有 20 個成功的任務。

  3. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  4. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  5. 前往「Logs」分頁,然後依序前往「All logs」>「Airflow logs」>「Workers」>「View in Logs Explorer」

    第二個範例有更多合併的工作,因此產生約 10 個警告和 7 個錯誤。您可以在直方圖中比較初始範例 (早期值) 和第二個範例 (後期值) 中的錯誤和警告數量。

    含有錯誤和警告的 Airflow 工作站記錄檔直方圖,顯示工作整合後的錯誤和警告數量減少
    圖 6. 工作合併後的 Airflow 工作站記錄直方圖 (按一下可放大)

    比較第一個範例與較整合的範例,您會發現第二個範例的錯誤和警告數量大幅減少。不過,由於資源超載,記錄檔中仍會顯示與暖關機相關的相同錯誤。

  6. 在「Monitoring」分頁中,選取「Workers」,然後查看圖表。

    比較第一個範例 (較早的值) 的 Airflow 工作圖表,以及第二個範例 (較多已整合的工作) 的圖表,您會發現,當工作較為整合時,佇列工作出現尖峰的時間會縮短。不過,這項作業耗時將近 10 分鐘,仍不夠理想。

    Airflow 任務隨時間變化的圖表顯示,Airflow 任務的尖峰現象持續時間比之前縮短。
    圖 7. 合併工作後的 Airflow 工作圖表 (按一下可放大)

    在「工作中的工作者」圖表中,您可以看到第一個範例 (圖表左側) 使用資源的時間比第二個範例長得多,即使兩個範例模擬的工作量相同。

    這張圖表顯示 Airflow 運作中工作站隨時間變化的趨勢,顯示運作中工作站數量在較短的時間內就增加了。
    圖 8. 工作合併後的活動工作站圖表 (按一下可放大)

    查看 worker 資源消耗量圖表。雖然在這個例子中,使用更多整合工作所需的資源與初始例子之間的差異相當大,但 CPU 使用率仍會飆升至限制的 70%。

    Airflow 工作站的 CPU 使用率圖表顯示 CPU 使用率最高可達上限的 70%
    圖 9. 工作合併後的工作站 CPU 使用率圖表 (按一下可放大)
    Airflow 工作站記憶體使用率圖表顯示記憶體用量增加,但未達上限
    圖 10. 工作合併後的工作站記憶體總用量圖表 (按一下可放大)

在一段時間內更平均地分配工作

並行工作過多會導致佇列已滿,導致工作卡在佇列中或重新排程。在先前的步驟中,您透過合併工作來減少工作數量,但輸出記錄和監控資料顯示,並行工作的數量仍不理想。

您可以實作排程,或設定同時執行的工作數量上限,藉此控制並行工作的數量。

在本教學課程中,您將在 dag_10_tasks_20_seconds_10 DAG 中新增 DAG 層級參數,以便更均勻地分配工作:

  1. max_active_runs=1 引數新增至 DAG 情境管理工具。這個引數會設定限制,在特定時間內只執行單一 DAG 例項。

  2. max_active_tasks=5 引數新增至 DAG 情境管理工具。這個引數會控制可在每個 DAG 中同時執行的任務例項數量上限。

將下列 DAG 範例上傳至您建立的環境。在本教學課程中,這個 DAG 的名稱為 dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

評估分配工作對排程程序的影響:

  1. 等待 DAG 執行作業完成。

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  4. 前往「Logs」分頁,然後依序前往「All logs」>「Airflow logs」>「Workers」>「View in Logs Explorer」

  5. 從直方圖中,您可以看到第三個 DAG 只有少數的有效工作和執行作業,因此不會產生任何警告或錯誤,且記錄分布看起來比先前的值更平均。

    含有錯誤和警告的 Airflow 工作站記錄檔直方圖,在任務完成後會經過時間分散,並顯示沒有錯誤或警告。
    圖 11. 在將工作合併並隨時間分發後,Airflow 工作站記錄的直方圖 (按一下可放大)

dag_10_tasks_20_seconds_10_scheduled 範例中的任務具有有限的有效任務數量,且執行時不會造成資源壓力,因為任務已排入佇列。

執行上述步驟後,您就能透過合併小型工作並在一段時間內更平均地分配這些工作,進而最佳化資源使用率。

最佳化環境設定

您可以調整環境設定,確保 Airflow 工作者一律有執行排隊工作的能力。

工作站數量和工作站並行作業

您可以調整工作站數量上限,讓 Cloud Composer 在設定限制範圍內自動調整環境。

[celery]worker_concurrency 參數會定義單一 worker 可從工作佇列中挑選的工作數量上限。變更這個參數可調整單一 worker 可同時執行的工作數量。您可以覆寫這個 Airflow 設定選項來變更。根據預設,worker 並行作業會根據 worker 可容納的輕量並行工作執行個體數量來設定。也就是說,其值取決於 worker 資源限制。工作站並行處理作業的值不受環境中工作站數量的影響。

工作站數量和工作站並行工作數量會相互影響,而環境效能則高度依賴這兩個參數。您可以根據下列考量因素,選擇正確的組合:

  • 並行執行多項快速工作。如果佇列中有多個等待的作業,且工作站同時使用 CPU 和記憶體的百分比偏低,您可以提高工作站並行處理作業的數量。不過,在某些情況下,佇列可能永遠不會填滿,導致自動調度資源功能永遠不會觸發。如果小型工作在新工作站就緒前完成執行,現有工作站可以接手剩餘的工作,而新建立的工作站則不會有任何工作。

    在這種情況下,建議您增加工作站數量下限和工作站並行作業數量,以免過度縮放。

  • 同時執行多項長時間工作。工作站並行處理作業的數量過多,導致系統無法調整工作站數量。如果多項工作需要大量資源且耗時很久,高工作者並行處理可能會導致佇列永遠不會填滿,且所有工作都由單一 worker 處理,進而導致效能問題。在這種情況下,建議您增加工作站數量上限,並降低工作站並行作業數量

平行處理的重要性

Airflow 排程器會控制 DAG 執行作業和 DAG 中的個別工作排程。[core]parallelism Airflow 設定選項會控制 Airflow 排程器在滿足這些工作的所有依附元件後,可在執行緒佇列中排入多少工作。

平行處理是 Airflow 的保護機制,可決定每個排程器同時執行的工作數量,不受工作站數量影響。平行處理值乘以叢集中的排程器數量,就是環境可排入佇列的工作例項數量上限。

通常,[core]parallelism 會設為工作站數量上限和 [celery]worker_concurrency 的乘積。這也會受到的影響。您可以覆寫這個 Airflow 設定選項來變更。如要進一步瞭解如何調整與縮放相關的 Airflow 設定,請參閱「縮放 Airflow 設定」。

找出最佳環境設定

建議您將小型工作合併為大型工作,並在一段時間內更平均地分配工作,以修正排程問題。除了最佳化 DAG 程式碼,您也可以最佳化環境設定,讓系統有足夠的容量同時執行多項工作。

舉例來說,假設您盡可能在 DAG 中合併任務,但限制有效任務,以便在一段時間內更平均地分散任務,這並非特定用途的理想解決方案。

您可以調整平行處理、工作站數量和工作站並行處理參數,執行 dag_10_tasks_20_seconds_10 DAG 時不受限於有效工作。在這個範例中,DAG 會執行 10 次,每次執行包含 20 個小型工作。如要同時執行所有測試:

  • 您需要更大的環境大小,因為這會控制環境中代管 Cloud Composer 基礎架構的效能參數。

  • Airflow 工作站必須能夠同時執行 20 項工作,也就是說,您需要將 worker 並行處理作業設為 20。

  • 工作站需要足夠的 CPU 和記憶體才能處理所有工作。worker 並行作業會受到 worker CPU 和記憶體的影響,因此您需要至少 worker_concurrency / 12 CPU 和 least worker_concurrency / 8 記憶體。

  • 您必須增加平行處理量,才能配合更高的 worker 並行處理作業。為了讓 worker 從佇列中挑選 20 項工作,排程器必須先排定這 20 項工作。

調整環境設定的方式如下:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 找出「資源」>「工作負載」設定,然後按一下「編輯」

  5. 在「Worker」部分的「Memory」欄位中,指定 Airflow worker 的新記憶體限制。在本教學課程中,請使用 4 GB。

  6. 在「CPU」欄位中,指定 Airflow 工作站的新 CPU 限制。在本教學課程中,請使用 2 個 vCPU。

  7. 儲存變更,並等待 Airflow 工作站重新啟動幾分鐘。

接著,覆寫平行處理和工作站並行處理 Airflow 設定選項:

  1. 前往「Airflow 設定覆寫」分頁。

  2. 依序按一下「編輯」和「新增 Airflow 設定覆寫」

  3. 覆寫平行處理設定:

    區段
    core parallelism 20
  4. 按一下「Add Airflow Configuration Override」,然後覆寫 worker 並行作業設定:

    區段
    celery worker_concurrency 20
  5. 按一下「儲存」,然後等待環境更新設定。

使用調整過的設定再次觸發相同的範例 DAG:

  1. 在 Airflow UI 中,前往「DAG」頁面。

  2. 找出 dag_10_tasks_20_seconds_10 DAG 並刪除。

    刪除 DAG 後,Airflow 會檢查環境值區中的 DAG 資料夾,並自動再次執行 DAG。

DAG 執行完畢後,請再次查看「記錄」的直方圖。從圖表中,您可以看到,在使用經過調整的環境設定執行時,具有更多整合工作任務的 dag_10_tasks_20_seconds_10 範例並未產生任何錯誤和警告。請將結果與圖表中的舊資料進行比較,在該圖表中,相同的範例在使用預設環境設定執行時,會產生錯誤和警告。

在調整環境設定後,含有錯誤和警告的 Airflow 工作站記錄檔直方圖顯示沒有錯誤和警告
圖 12. 調整環境設定後的 Airflow worker 記錄直方圖 (按一下可放大)

環境設定和 Airflow 設定在工作排程方面扮演重要角色,但無法超過特定限制值。

建議您最佳化 DAG 程式碼、整合工作,並使用排程功能,以便提升效能和效率。

範例:DAG 程式碼複雜,導致 DAG 剖析錯誤和延遲

在這個範例中,您將調查模擬 Airflow 變數過多的 DAG 範例,並分析其剖析延遲時間。

建立新的 Airflow 變數

上傳程式碼範例前,請建立新的 Airflow 變數。

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  3. 依序前往「管理」>「變數」>「新增記錄」

  4. 設定以下這些值:

    • 鍵:example_var
    • val:test_airflow_variable

將範例 DAG 上傳至環境

上傳以下 DAG 範例至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 的名稱為 dag_for_loop_airflow_variable

這個 DAG 包含一個執行 1,000 次的 for 迴圈,並模擬過多的 Airflow 變數。每個迭代都會讀取 example_var 變數並產生工作。每項工作都包含一個指令,用於列印變數的值。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

診斷剖析問題

DAG 剖析時間是 Airflow 排程器讀取及剖析 DAG 檔案所需的時間長度。在 Airflow 排程器可以排定 DAG 中的任何工作之前,排程器必須剖析 DAG 檔案,以便找出 DAG 的結構和定義的工作。

如果 DAG 的剖析作業耗時過長,會耗用排程器的容量,並可能降低 DAG 執行效能。

如要監控 DAG 剖析時間,請按照下列步驟操作:

  1. 在 gcloud CLI 中執行 dags report Airflow CLI 指令,查看所有 DAG 的剖析時間:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    更改下列內容:

    • ENVIRONMENT_NAME:環境名稱。
    • LOCATION:環境所在的地區。
  2. 在指令輸出中,找出 dag_for_loop_airflow_variables DAG 的時間長度值。如果值偏高,表示這個 DAG 並未以最佳方式實作。如果您有多個 DAG,可以從輸出表格中找出哪些 DAG 的剖析時間過長。

    範例:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. 在 Google Cloud 控制台中檢查 DAG 剖析時間:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  4. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  5. 前往「Logs」分頁,然後依序前往「All Logs」>「DAG processor manager」

  6. 查看 dag-processor-manager 記錄,找出可能的問題。

    範例 DAG 的記錄項目顯示 DAG 剖析時間為 46.3 秒
    圖 13. DAG 處理器管理工具記錄顯示 DAG 剖析時間 (按一下即可放大)

如果 DAG 的總剖析時間超過 10 秒,排程器可能會因 DAG 剖析作業過載而無法有效執行 DAG。

最佳化 DAG 程式碼

建議您不要在 DAG 中使用不必要的「頂層」Python 程式碼。如果 DAG 包含許多 DAG 外部的匯入項目、變數和函式,Airflow 排程器的剖析時間就會拉長。這會降低 Cloud Composer 和 Airflow 的效能和可擴充性。讀取過多 Airflow 變數會導致解析時間過長,並造成資料庫負載過高。如果這個程式碼位於 DAG 檔案中,這些函式會在每個排程器心跳時執行,因此可能會很慢。

Airflow 的範本欄位可讓您將 Airflow 變數和 Jinja 範本的值納入 DAG。這可避免在排程器心跳期間執行不必要的函式。

如要以更佳的方式實作 DAG 範例,請避免在 DAG 的頂層 Python 程式碼中使用 Airflow 變數。請改為透過 Jinja 範本將 Airflow 變數傳遞至現有運算子,這樣就能延後讀取值,直到工作執行時才讀取。

上傳新版範例 DAG 至環境。在本教學課程中,這個 DAG 的名稱為 dag_for_loop_airflow_variable_optimized

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

檢查新的 DAG 剖析時間:

  1. 等待 DAG 執行作業完成。

  2. 再次執行 dags report 指令,查看所有 DAG 的剖析時間:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. 再次查看 dag-processor-manager 記錄,並分析剖析時間。

    範例 DAG 的記錄項目顯示 DAG 剖析時間為 4.21 秒
    圖 14. DAG 處理器管理工具記錄檔顯示 DAG 代碼最佳化後的 DAG 剖析時間 (按一下可放大)

您將環境變數替換為 Airflow 範本,簡化了 DAG 程式碼,並將剖析延遲時間縮短約十倍。

最佳化 Airflow 環境設定

Airflow 排程器會持續嘗試觸發新工作,並剖析環境集區中的所有 DAG。如果 DAG 的剖析時間很長,且排程器會消耗大量資源,您可以最佳化 Airflow 排程器設定,讓排程器更有效率地使用資源。

在本教學課程中,DAG 檔案需要花費大量時間進行剖析,且剖析週期開始重疊,進而耗盡排程器的容量。在本例中,第一個 DAG 的剖析時間超過 5 秒,因此您可以將排程器設定為較少執行,以便更有效率地使用資源。您將覆寫 scheduler_heartbeat_sec Airflow 設定選項。這項設定會定義排程器應執行的頻率 (以秒為單位)。預設值為 5 秒。您可以覆寫這個 Airflow 設定選項來變更。

覆寫 scheduler_heartbeat_sec Airflow 設定選項:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「Airflow 設定覆寫」分頁。

  4. 依序按一下「編輯」和「新增 Airflow 設定覆寫」

  5. 覆寫 Airflow 設定選項:

    區段
    scheduler scheduler_heartbeat_sec 10
  6. 按一下「儲存」,然後等待環境更新設定。

查看排程器指標:

  1. 前往「監控」分頁,然後選取「排程器」

  2. 在「Scheduler heartbeat」圖表中,按一下「More options」按鈕 (三點圖示),然後點選「View in the Metrics Explorer」

排程器活動訊號圖表顯示活動訊號發生的頻率降低
圖 15. 排程器活動訊號圖表 (按一下可放大)

在圖表中,您會看到在將預設設定從 5 秒變更為 10 秒後,排程器的執行頻率減少了兩倍。降低心跳頻率,可確保排程器不會在先前的剖析週期持續進行時開始執行,且不會耗盡排程器的資源容量。

將更多資源指派給排程器

在 Cloud Composer 2 中,您可以為排程器分配更多 CPU 和記憶體資源。這樣一來,您就能提升排程器的效能,並加快 DAG 的剖析時間。

為排程器分配額外的 CPU 和記憶體:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 找出「資源」>「工作負載」設定,然後按一下「編輯」

  5. 在「Scheduler」部分的「Memory」欄位中,指定新的記憶體限制。在本教學課程中,請使用 4 GB。

  6. 在「CPU」欄位中,指定新的 CPU 限制。在本教學課程中,請使用 2 個 vCPU。

  7. 儲存變更,並等待 Airflow 排程器重新啟動幾分鐘。

  8. 前往「Logs」分頁,然後依序前往「All Logs」>「DAG processor manager」

  9. 請查看 dag-processor-manager 記錄,並比較範例 DAG 的剖析時間長度:

    範例 DAG 的記錄項目顯示,經過最佳化的 DAG 的 DAG 剖析時間為 1.5 秒。對於未經最佳化的 DAG,剖析時間為 28.71 秒
    圖 16. DAG 處理器管理工具記錄檔顯示,在將更多資源指派給排程器後的 DAG 剖析時間 (按一下可放大)

將更多資源指派給排程器後,您可以提高排程器的容量,並大幅縮短剖析延遲時間 (相較於預設環境設定)。資源越多,排程器解析 DAG 的速度就越快,但與 Cloud Composer 資源相關的費用也會隨之增加。此外,資源數量也無法超過特定上限。

建議您在實作可能的 DAG 程式碼和 Airflow 設定最佳化後,再分配資源。

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。

刪除 Cloud Composer 環境。您也必須在這項程序中刪除環境的儲存桶。

後續步驟