Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程將逐步說明如何診斷及排解工作排程和剖析問題,這些問題會導致排程器故障、剖析錯誤和延遲,以及工作失敗。
簡介
Airflow 排程器主要受到兩項因素影響:工作排程和 DAG 剖析。如果其中一個因素發生問題,可能會對環境健康和效能造成負面影響。
有時會同時排定太多工作,在這種情況下,佇列會填滿,工作會維持「已排定」狀態,或在排入佇列後重新排定,這可能會導致工作失敗和效能延遲。
另一個常見問題是 DAG 程式碼複雜度造成的剖析延遲和錯誤。舉例來說,如果 DAG 程式碼在程式碼的頂層包含 Airflow 變數,可能會導致剖析延遲、資料庫過載、排程失敗和 DAG 超時。
在本教學課程中,您將診斷範例 DAG,並瞭解如何排解排程和剖析問題、改善 DAG 排程,以及最佳化 DAG 程式碼和環境設定,以提升效能。
目標
本節列出本教學課程範例的目標。
範例:排程器故障,高工作並行導致延遲
上傳會同時多次執行的範例 DAG,並使用 Cloud Monitoring 診斷排程器故障和延遲問題。
合併工作來最佳化 DAG 程式碼,並評估對效能的影響。
在一段時間內更平均地分配工作,並評估成效影響。
最佳化 Airflow 和環境設定,並評估影響。
範例:複雜程式碼導致 DAG 剖析錯誤和延遲
上傳含有 Airflow 變數的範例 DAG,並使用 Cloud Monitoring 診斷剖析問題。
避免在程式碼頂層使用 Airflow 變數,並評估對剖析時間的影響,藉此最佳化 DAG 程式碼。
最佳化 Airflow 設定和環境設定,並評估對剖析時間的影響。
費用
本教學課程使用下列 Google Cloud的計費元件:
完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱「清除」。
事前準備
本節說明開始教學課程前必須執行的動作。
建立及設定專案
本教學課程需要 Google Cloud 專案。請按以下方式設定專案:
在 Google Cloud 控制台中,選取或建立專案:
請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
請確認 Google Cloud 專案使用者具有下列角色,可建立必要資源:
- 環境與 Storage 物件管理員
(
roles/composer.environmentAndStorageObjectAdmin
) - 運算管理員 (
roles/compute.admin
)
- 環境與 Storage 物件管理員
(
為專案啟用 API
Enable the Cloud Composer API.
建立 Cloud Composer 環境
建立環境時,您會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext
) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在專案中執行作業。 Google Cloud
範例:排程器故障,且工作排程問題導致工作失敗
這個範例會示範如何偵錯排程器故障和高工作並行性造成的延遲。
將範例 DAG 上傳至環境
上傳下列範例 DAG 至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 名為 dag_10_tasks_200_seconds_1
。
這個 DAG 有 200 項工作。每個工作都會等待 1 秒,然後列印「Complete!」。上傳後,系統會自動觸發 DAG。Cloud Composer 會執行這個 DAG 10 次,且所有 DAG 執行作業都會並行進行。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
診斷排程器故障和工作失敗問題
DAG 執行完成後,開啟 Airflow UI,然後按一下 dag_10_tasks_200_seconds_1
DAG。您會看到 10 個 DAG 執行作業都成功,且每個作業都有 200 個成功執行的工作。
查看 Airflow 工作記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「Airflow 記錄」>「工作人員」>「在記錄檔探索工具中查看」。
在記錄直方圖中,您可以看到以紅色和橘色標示的錯誤和警告:

範例 DAG 產生約 130 個警告和 60 個錯誤。按一下包含黃色和紅色長條的任何欄。您會在記錄中看到下列部分警告和錯誤:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
這些記錄可能表示資源用量超出限制,因此工作人員自行重新啟動。
如果 Airflow 工作在佇列中保留太久,排程器會將其標示為失敗並等待重試,然後再次重新排程以供執行。如要觀察這種情況的徵兆,可以查看佇列工作數量的圖表,如果圖表中的尖峰在約 10 分鐘內沒有下降,則工作很可能會失敗 (沒有記錄)。
查看監控資訊:
前往「監控」分頁,然後選取「總覽」。
查看 Airflow 工作圖表。
圖 2. Airflow 工作圖表 (按一下可放大) 在 Airflow 工作圖表中,排隊任務的數量在 10 分鐘內大幅增加,這可能表示環境中的資源不足,無法處理所有排定的任務。
查看「活躍的工作站數量」圖表:
圖 3. 活躍工作站圖表 (按一下可放大) 「有效工作站」圖表顯示,DAG 在執行期間觸發自動調度資源,將工作站數量調度至允許上限 (三個)。
資源用量圖表可指出 Airflow 工作站缺乏容量,無法執行排入佇列的工作。在「Monitoring」(監控) 分頁中,選取「Workers」(工作站),然後查看「Total worker CPU usage」(工作站 CPU 總用量) 和「Total worker memory usage」(工作站記憶體總用量) 圖表。
圖 4. 工作站 CPU 使用率總計圖表 (點選即可放大) 圖 5. 工作站記憶體總用量圖表 (按一下即可放大) 圖表顯示,同時執行過多工作導致 CPU 達到上限。資源已使用超過 30 分鐘,甚至比 10 次 DAG 執行 (每次執行 200 項工作) 的總時間還長。
這些指標代表佇列已填滿,且缺乏資源來處理所有排定的工作。
整合工作
目前的程式碼會建立許多 DAG 和工作,但沒有足夠的資源可平行處理所有工作,導致佇列填滿。如果工作在佇列中停留太久,可能會導致工作重新排程或失敗。 在這種情況下,建議您選擇較少但更整合的任務。
下列 DAG 範例會將初始範例中的工作數量從 200 個變更為 20 個,並將等待時間從 1 秒增加至 10 秒,以模擬更多合併工作,但工作量相同。
上傳下列範例 DAG 至您建立的環境。在本教學課程中,這個 DAG 名為 dag_10_tasks_20_seconds_10
。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
評估整合更多工作對排程程序的影響:
等待 DAG 執行完成。
在 Airflow UI 的「DAGs」頁面中,按一下
dag_10_tasks_20_seconds_10
DAG。您會看到 10 個 DAG 執行作業,每個作業都有 20 個成功執行的工作。前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「Airflow 記錄」>「工作人員」>「在記錄檔探索工具中查看」。
第二個範例包含更多合併的作業,因此產生約 10 個警告和 7 個錯誤。在直方圖上,您可以比較初始範例 (較早的值) 和第二個範例 (較晚的值) 中的錯誤和警告數量。
圖 6. 工作合併後,Airflow 工作站記錄直方圖 (按一下即可放大) 比較第一個範例與經過整合的範例後,您會發現第二個範例的錯誤和警告數量明顯減少。不過,由於資源過載,記錄檔中仍會顯示與暖關機相關的相同錯誤。
在「監控」分頁中,選取「工作人員」並查看圖表。
比較第一個範例 (較早的值) 的 Airflow 工作圖表與第二個範例 (工作更精簡) 的圖表,您會發現工作更精簡時,排隊工作激增的時間較短。不過,這段時間接近 10 分鐘,仍不理想。
圖 7. 工作合併後的 Airflow 工作圖 (按一下可放大) 在「活躍工作者」圖表中,您可以看到第一個範例 (圖表左側) 使用的資源時間比第二個範例長得多,即使兩個範例模擬的工作量相同。
圖 8. 工作合併後的有效工作站圖表 (按一下可放大) 查看工作人員資源消耗圖表。雖然在整合更多工作的範例中,資源用量與初始範例的差異相當顯著,但 CPU 使用率仍會飆升至限制的 70%。
圖 9. 工作合併後的工作站 CPU 使用率總計圖表 (按一下即可放大) 圖 10. 工作合併後的工作站記憶體總用量圖表 (按一下可放大)
在一段時間內更平均地分配工作
如果並行工作過多,佇列就會填滿,導致工作卡在佇列中或重新排程。在先前的步驟中,您已合併工作來減少工作數量,但輸出記錄和監控結果顯示,並行工作數量仍未達到最佳狀態。
您可以實作排程,或設定可同時執行的工作數量上限,藉此控管並行執行的工作數量。
在本教學課程中,您會在 dag_10_tasks_20_seconds_10
DAG 中加入 DAG 層級的參數,讓工作在一段時間內更平均地分配:
將
max_active_runs=1
引數新增至 DAG 情境管理工具。這個引數會設定限制,在特定時間只能執行一個 DAG 執行個體。將
max_active_tasks=5
引數新增至 DAG 情境管理工具。這個引數可控制每個 DAG 中可並行執行的工作執行個體數量上限。
上傳下列範例 DAG 至您建立的環境。在本教學課程中,這個 DAG 名為 dag_10_tasks_20_seconds_10_scheduled.py
。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
評估隨時間分配工作對排程程序的影響:
等待 DAG 執行完成。
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「Airflow 記錄」>「工作人員」>「在記錄檔探索工具中查看」。
從直方圖中可以看出,第三個 DAG 的有效工作和執行次數有限,因此未產生任何警告或錯誤,且記錄的分布情形也比先前的數值更平均。
圖 11. 工作合併並隨時間分配後,Airflow 工作站記錄直方圖 (按一下可放大)
dag_10_tasks_20_seconds_10_scheduled
範例中的工作數量有限,且執行次數不多,因此不會造成資源壓力,因為工作是平均排入佇列。
完成上述步驟後,您已整合小型工作,並在一段時間內更平均地分配這些工作,進而最佳化資源用量。
最佳化環境設定
您可以調整環境設定,確保 Airflow 工作人員一律有足夠容量執行已加入佇列的工作。
工作站數量和工作站並行
您可以調整工作站數量上限,讓 Cloud Composer 在設定的限制內自動調度環境資源。
[celery]worker_concurrency
參數會定義單一工作者可從工作佇列中擷取的工作數量上限。變更這個參數會調整單一工作站可同時執行的工作數量。如要變更這個 Airflow 設定選項,請覆寫該選項。根據預設,工作站並行數是根據工作站可容納的輕量型並行工作執行個體數量設定。也就是說,這個值取決於工作站資源限制。工作站並行值與環境中的工作站數量無關。
工作站數量和工作站並行數會相互搭配運作,環境效能則高度取決於這兩個參數。您可以根據下列考量事項,選擇正確的組合:
並行執行多項快速工作。如果佇列中有等待執行的工作,且工作站的 CPU 和記憶體使用率偏低,您可以增加工作站並行數。不過,在某些情況下,佇列可能永遠不會填滿,導致自動調度資源功能永遠不會觸發。如果小型工作在新的工作站準備就緒前完成執行,現有工作站可以接手處理剩餘工作,新建立的工作站則不會有任何工作。
在這種情況下,建議增加工作站數量下限和工作站並行數,避免過度積極的調整作業。
多個長時間執行的工作並行運作。工作站並行數過高,導致系統無法調整工作站數量。如果多項工作耗用大量資源,且需要很長時間才能完成,高工作並行可能會導致佇列永遠無法填滿,所有工作都只由一個工作人員接手,進而導致效能問題。在這些情況下,建議增加工作站數量上限,並減少工作站並行數。
平行處理的重要性
Airflow 排程器會控管 DAG 執行作業和 DAG 中個別工作的排程。[core]parallelism
Airflow 設定選項會控管 Airflow 排程器在滿足這些工作的所有依附元件後,可在執行器的佇列中排入多少工作。
平行處理是 Airflow 的保護機制,可決定每個排程器可同時執行的工作數量,與工作站數量無關。平行處理任務數量乘以叢集中的排程器數量,就是環境可加入佇列的任務執行個體數量上限。
通常 [core]parallelism
會設為工作站數量上限與 [celery]worker_concurrency
的乘積。這項指標也會受到集區影響。如要變更這個 Airflow 設定選項,請覆寫該選項。如要進一步瞭解如何調整與擴縮相關的 Airflow 設定,請參閱「擴縮 Airflow 設定」。
找出最佳環境設定
建議您將小型工作整合成大型工作,並更平均地分配工作,以修正排程問題。除了最佳化 DAG 程式碼,您也可以最佳化環境設定,確保有足夠的容量可同時執行多項工作。
舉例來說,假設您盡可能在 DAG 中合併工作,但限制有效工作以更平均地分散工作時間,並非特定用途的偏好解決方案。
您可以調整平行處理、工作站數量和工作站並行參數,執行 dag_10_tasks_20_seconds_10
DAG,而不限制有效工作。在本例中,DAG 會執行 10 次,每次執行都包含 20 個小型工作。如要同時執行所有測試:
您需要較大的環境大小,因為這會控管環境代管 Cloud Composer 基礎架構的效能參數。
Airflow 工作站必須能夠同時執行 20 項工作,因此您需要將工作站並行數設為 20。
工作站需要足夠的 CPU 和記憶體才能處理所有工作。Worker 並行會受到 worker CPU 和記憶體影響,因此您至少需要
worker_concurrency / 12
個 CPU 和least worker_concurrency / 8
的記憶體。您需要提高平行處理量,才能配合更高的工作站並行數。 如要讓工作人員從佇列中挑選 20 項工作,排程器必須先排定這 20 項工作。
請按照下列方式調整環境設定:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
找出「Resources」>「Workloads」設定,然後按一下「Edit」。
在「工作站」部分的「記憶體」欄位中,指定 Airflow 工作站的新記憶體限制。在本教學課程中,請使用 4 GB。
在「CPU」欄位中,指定 Airflow 工作站的新 CPU 限制。在本教學課程中,請使用 2 個 vCPU。
儲存變更,並等待幾分鐘,讓 Airflow 工作站重新啟動。
接著,請覆寫平行處理量和工作站並行 Airflow 設定選項:
前往「Airflow Configuration Overrides」(Airflow 設定覆寫) 分頁。
按一下「編輯」,然後按一下「新增 Airflow 設定覆寫」。
覆寫平行處理設定:
區段 鍵 值 core
parallelism
20
按一下「Add Airflow Configuration Override」(新增 Airflow 設定覆寫),然後覆寫工作站並行設定:
區段 鍵 值 celery
worker_concurrency
20
按一下「儲存」,等待環境更新設定。
使用調整後的設定再次觸發相同的範例 DAG:
在 Airflow 使用者介面中,前往「DAGs」頁面。
找出
dag_10_tasks_20_seconds_10
DAG 並刪除。刪除 DAG 後,Airflow 會檢查環境值區中的 DAG 資料夾,並自動再次執行 DAG。
DAG 執行完畢後,請再次查看「記錄」直方圖。在圖表中,您可以看到 dag_10_tasks_20_seconds_10
範例在執行時,由於環境設定經過調整,因此沒有產生任何錯誤和警告,且任務更加精簡。比較結果與圖表中的早期資料,其中相同的範例在以預設環境設定執行時產生錯誤和警告。

環境設定和 Airflow 設定在排定工作時扮演重要角色,但設定無法超過特定限制。
建議您最佳化 DAG 程式碼、整合工作,並使用排程功能,以提升效能和效率。
範例:DAG 剖析錯誤和延遲時間,原因為 DAG 程式碼複雜
在本範例中,您會調查模仿過多 Airflow 變數的範例 DAG 的剖析延遲。
建立新的 Airflow 變數
上傳程式碼範例前,請先建立新的 Airflow 變數。
前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
依序前往「管理」>「變數」> 「新增記錄」。
設定以下這些值:
- 按鍵:
example_var
- 值:
test_airflow_variable
- 按鍵:
將範例 DAG 上傳至環境
上傳下列範例 DAG 至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 名為 dag_for_loop_airflow_variable
。
這個 DAG 包含會執行 1,000 次的 for 迴圈,並模擬過多的 Airflow 變數。每次疊代都會讀取 example_var
變數,並產生工作。每項工作都包含一個指令,用於列印變數的值。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
診斷剖析問題
DAG 剖析時間是指 Airflow 排程器讀取及剖析 DAG 檔案所需的時間。Airflow 排程器必須先剖析 DAG 檔案,找出 DAG 的結構和定義的任務,才能排定 DAG 中的任何任務。
如果 DAG 需要很長時間才能剖析,就會耗用排程器的容量,並可能降低 DAG 執行的效能。
如要監控 DAG 剖析時間,請按照下列步驟操作:
在 gcloud CLI 中執行
dags report
Airflow CLI 指令,即可查看所有 DAG 的剖析時間:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。
在指令輸出中,尋找
dag_for_loop_airflow_variables
DAG 的持續時間值。如果值很大,可能表示這個 DAG 的實作方式不夠理想。如果您有多個 DAG,可以從輸出資料表找出剖析時間較長的 DAG。範例:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
在 Google Cloud 控制台中檢查 DAG 解析時間:
- 前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「DAG 處理器管理工具」。
查看
dag-processor-manager
記錄並找出可能的問題。圖 13. DAG 處理器管理工具記錄檔會顯示 DAG 剖析時間 (按一下即可放大)
如果 DAG 剖析總時間超過約 10 秒,排程器可能會因 DAG 剖析作業而過載,無法有效執行 DAG。
最佳化 DAG 程式碼
建議您避免在 DAG 中使用不必要的「頂層」Python 程式碼。如果 DAG 包含許多匯入項目、變數和函式 (位於 DAG 外部),Airflow 排程器剖析 DAG 的時間就會變長。這會降低 Cloud Composer 和 Airflow 的效能和可擴充性。讀取過多 Airflow 變數會導致剖析時間過長,以及資料庫負載過高。如果這段程式碼位於 DAG 檔案中,這些函式會在每個排程器心跳時執行,速度可能會較慢。
Airflow 的範本欄位可讓您將 Airflow 變數和 Jinja 範本的值併入 DAG。這樣可避免在排程器心跳期間執行不必要的函式。
如要以更完善的方式實作 DAG 範例,請避免在 DAG 的頂層 Python 程式碼中使用 Airflow 變數。請改為透過 Jinja 範本將 Airflow 變數傳遞至現有運算子,這樣系統就會延遲讀取值,直到工作執行為止。
上傳新版範例 DAG 至環境。在本教學課程中,這個 DAG 名為 dag_for_loop_airflow_variable_optimized
。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
檢查新的 DAG 剖析時間:
等待 DAG 執行完畢。
再次執行
dags report
指令,查看所有 DAG 的剖析時間:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
再次查看
dag-processor-manager
記錄,並分析剖析時間長度。圖 14. DAG 處理器管理工具記錄檔會顯示 DAG 程式碼最佳化後的 DAG 剖析時間 (按一下即可放大)
將環境變數替換為 Airflow 範本後,DAG 程式碼變得更簡潔,剖析延遲時間也減少約十倍。
最佳化 Airflow 環境設定
Airflow 排程器會持續嘗試觸發新工作,並剖析環境 bucket 中的所有 DAG。如果 DAG 的剖析時間較長,且排程器耗用大量資源,您可以最佳化 Airflow 排程器設定,讓排程器更有效率地使用資源。
在本教學課程中,DAG 檔案需要很長時間才能剖析,且剖析週期開始重疊,導致排程器的容量耗盡。在我們的範例中,第一個範例 DAG 的剖析時間超過 5 秒,因此您會設定排程器,降低執行頻率,以更有效率地使用資源。您將覆寫 scheduler_heartbeat_sec
Airflow 設定選項。這項設定會定義排程器應執行的頻率 (以秒為單位)。預設值為 5 秒。
如要變更這個 Airflow 設定選項,請覆寫該選項。
覆寫 scheduler_heartbeat_sec
Airflow 設定選項:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「Airflow Configuration Overrides」(Airflow 設定覆寫) 分頁。
按一下「編輯」,然後按一下「新增 Airflow 設定覆寫」。
覆寫 Airflow 設定選項:
區段 鍵 值 scheduler
scheduler_heartbeat_sec
10
按一下「儲存」,等待環境更新設定。
查看排程器指標:
前往「監控」分頁,然後選取「排程器」。
在「排程器心跳」圖表中,按一下「更多選項」按鈕 (三個點),然後按一下「在 Metrics Explorer 中查看」。

在圖表中,您會看到將預設設定從 5 秒變更為 10 秒後,排程器執行頻率減少兩倍。降低心跳頻率可確保排程器不會在先前的剖析週期進行時開始執行,且排程器的資源容量不會耗盡。
為排程器指派更多資源
在 Cloud Composer 2 中,您可以為排程器分配更多 CPU 和記憶體資源。這樣一來,您就能提升排程器的效能,並縮短 DAG 的剖析時間。
為排程器分配額外的 CPU 和記憶體:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
找出「Resources」>「Workloads」設定,然後按一下「Edit」。
在「Scheduler」(排程器) 部分的「Memory」(記憶體) 欄位中,指定新的記憶體限制。在本教學課程中,請使用 4 GB。
在「CPU」欄位中,指定新的 CPU 限制。在本教學課程中,請使用 2 個 vCPU。
儲存變更,並等待幾分鐘,讓 Airflow 排程器重新啟動。
前往「記錄」分頁,然後依序前往「所有記錄」>「DAG 處理器管理工具」。
查看
dag-processor-manager
記錄檔,並比較範例 DAG 的剖析時間長度:圖 16. DAG 處理器管理工具記錄檔顯示,在為排程器指派更多資源後,DAG 剖析時間有所改善 (按一下即可放大)
將更多資源指派給排程器後,排程器的容量就會增加,與預設環境設定相比,剖析延遲時間也會大幅縮短。資源越多,排程器剖析 DAG 的速度就越快,但 Cloud Composer 資源的相關費用也會增加。此外,資源也無法超過特定限制。
建議您先實作可能的 DAG 程式碼和 Airflow 設定最佳化,再分配資源。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
刪除個別資源
如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。
刪除 Cloud Composer 環境。您也可以在執行這項程序時刪除環境的 bucket。