Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程提供相關步驟,說明如何使用記錄和環境監控功能,在 Cloud Composer 中偵錯失敗的 Airflow DAG,並診斷工作站資源相關問題,例如工作站記憶體或儲存空間不足。
簡介
本教學課程著重於資源相關問題,示範如何對 DAG 進行偵錯。
如果沒有分配足夠的工作站資源,DAG 就會失敗。如果 Airflow 工作記憶體或儲存空間不足,您可能會看到 Airflow 例外狀況,例如:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
或
Task exited with return code Negsignal.SIGKILL
在這種情況下,一般建議是增加 Airflow 工作站資源,或減少每個工作站的任務數量。不過,由於 Airflow 例外狀況可能是一般狀況,因此找出造成問題的特定資源可能並不容易。
本教學課程說明如何診斷 DAG 失敗的原因,以及如何透過偵錯兩個因工作站記憶體和儲存空間不足而失敗的 DAG 範例,找出造成問題的資源類型。
目標
執行因下列原因而失敗的 DAG 範例:
- 工作站記憶體不足
- 工作站儲存空間不足
診斷失敗原因
增加分配的工作站資源
使用新的資源限制測試 DAG
費用
本教學課程使用下列 Google Cloud的計費元件:
完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱「清除」。
事前準備
本節說明開始教學課程前必須執行的動作。
建立及設定專案
本教學課程需要 Google Cloud 專案。請按以下方式設定專案:
在 Google Cloud 控制台中,選取或建立專案:
請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
請確認 Google Cloud 專案使用者具有下列角色,可建立必要資源:
- 環境與 Storage 物件管理員
(
roles/composer.environmentAndStorageObjectAdmin
) - 運算管理員 (
roles/compute.admin
) - 「監控編輯者」 (
roles/monitoring.editor
)
- 環境與 Storage 物件管理員
(
為專案啟用 API
Enable the Cloud Composer API.
建立 Cloud Composer 環境
建立環境時,您會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext
) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在專案中執行作業。 Google Cloud
檢查工作站資源限制
檢查環境中的 Airflow 工作站資源限制:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
依序前往「資源」>「工作負載設定」 >「工作人員」。
確認值為 0.5 個 vCPU、1.875 GB 記憶體和 1 GB 儲存空間。 這些是 Airflow 工作站的資源限制,您會在教學課程的後續步驟中使用。
範例:診斷記憶體不足問題
上傳下列範例 DAG 至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 名為 create_list_with_many_strings
。
這個 DAG 包含一項工作,會執行下列步驟:
- 建立空白清單
s
。 - 執行週期,將
More
字串附加至清單。 - 列印清單耗用的記憶體量,並在每 1 分鐘的疊代中等待 1 秒。
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
觸發範例 DAG
觸發範例 DAG create_list_with_many_strings
:
前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
在 Airflow 網頁介面中,前往「DAGs」頁面,在 DAG 的「Links」資料欄中,按一下「Trigger Dag」按鈕。
按一下「觸發條件」。
在「DAG」頁面中,點選您觸發的工作,然後查看輸出記錄,確認 DAG 已開始執行。
工作執行期間,輸出記錄會列印 DAG 使用的記憶體大小 (以 GB 為單位)。
幾分鐘後,工作會因超出 Airflow 工作站的 1.875 GB 記憶體限制而失敗。
診斷失敗的 DAG
如果失敗時正在執行多項工作,請考慮只執行一項工作,並診斷該期間的資源壓力,找出造成資源壓力的工作,以及需要增加的資源。
查看 Airflow 工作記錄
請注意,create_list_with_many_strings
DAG 中的工作具有 Failed
狀態。
查看工作記錄。您會看到下列記錄項目:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
查看工作負載
檢查工作負載,確認工作負載不會導致 Pod 執行的節點超出記憶體用量上限:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
在「資源」>「GKE 叢集」>「工作負載」中,按一下「查看叢集工作負載」。
檢查部分工作負載 Pod 是否具有類似下列的狀態:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
表示容器或 Pod 嘗試使用的記憶體超出允許範圍。系統會終止程序,避免記憶體用量過高。
查看環境健康狀態和資源用量監控
查看環境健康狀態和資源用量監控:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「監控」分頁,然後選取「總覽」。
在「環境總覽」面板中,找到「環境健康狀態 (Airflow 監控 DAG)」圖表。其中包含紅色區域,對應於記錄開始列印錯誤的時間。
選取「工作站」,然後找到「工作站記憶體總用量」圖表。 請注意,「記憶體用量」行在工作執行時出現尖峰。

即使圖表上的記憶體用量線未達到上限,診斷失敗原因時,您仍需考量個別工作人員的記憶體用量。每個工作站都會使用部分記憶體執行其他容器,以執行工作站運作所需的動作,例如將 DAG 檔案與環境的 bucket 同步處理。工作站可用的實際記憶體量會低於記憶體上限,因此無法執行 Airflow 工作。如果工作站達到可用實際記憶體上限,執行的工作可能會因工作站記憶體不足而失敗。在這種情況下,即使總工作站記憶體用量圖表上的線條未達到記憶體上限,您仍可能會觀察到工作失敗。
提高工作站記憶體上限
分配額外的工作站記憶體,確保範例 DAG 成功執行:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
找出「Resources」>「Workloads」設定,然後按一下「Edit」。
在「工作站」部分的「記憶體」欄位中,指定 Airflow 工作站的新記憶體限制。在本教學課程中,請使用 3 GB。
儲存變更,並等待幾分鐘,讓 Airflow 工作站重新啟動。
使用新的記憶體限制測試 DAG
再次觸發 create_list_with_many_strings
DAG,並等待執行完畢。
在 DAG 執行作業的輸出記錄中,您會看到
Marking task as SUCCESS
,且工作狀態會顯示「成功」。查看「監控」分頁中的「環境總覽」部分,確認沒有任何紅色區域。
按一下「工作站」部分,然後找出「工作站記憶體總用量」圖表。您會看到「記憶體上限」行反映記憶體上限的變更,「記憶體用量」行遠低於實際可分配的記憶體上限。
範例:診斷儲存空間不足問題
在這個步驟中,您會上傳兩個會建立大型檔案的 DAG。第一個 DAG 會建立大型檔案。第二個 DAG 會建立大型檔案,並模擬長時間執行的作業。
兩個 DAG 中的檔案大小都超過預設的 Airflow 工作站儲存空間上限 1 GB,但第二個 DAG 有額外的等待工作,可人為延長其持續時間。
您將在後續步驟中,調查這兩個 DAG 的行為差異。
上傳會建立大型檔案的 DAG
上傳下列範例 DAG 至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 名為 create_large_txt_file_print_logs
。
這個 DAG 包含一項工作,會執行下列步驟:
- 將 1.5 GB 的
localfile.txt
檔案寫入 Airflow 工作站儲存空間。 - 使用 Python
os
模組列印所建立檔案的大小。 - 每 1 分鐘列印一次 DAG 執行作業的持續時間。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
上傳 DAG,在長時間執行的作業中建立大型檔案
如要模擬長時間執行的 DAG,並調查工作持續時間對最終狀態的影響,請上傳第二個範例 DAG 至環境。在本教學課程中,這個 DAG 名為 long_running_create_large_txt_file_print_logs
。
這個 DAG 包含一項工作,會執行下列步驟:
- 將 1.5 GB 的
localfile.txt
檔案寫入 Airflow 工作站儲存空間。 - 使用 Python
os
模組列印所建立檔案的大小。 - 等待 1 小時 15 分鐘,模擬處理檔案作業所需的時間,例如讀取檔案。
- 每 1 分鐘列印一次 DAG 執行作業的持續時間。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
觸發範例 DAG
觸發第一個 DAG create_large_txt_file_print_logs
:
前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
在 Airflow 網頁介面中,前往「DAGs」頁面,在 DAG 的「Links」資料欄中,按一下「Trigger Dag」按鈕。
按一下「觸發條件」。
在「DAG」頁面中,點選您觸發的工作,然後查看輸出記錄,確認 DAG 已開始執行。
等待使用
create_large_txt_file_print_logs
DAG 建立的工作完成。這可能需要幾分鐘的時間。在「DAGs」頁面中,按一下 DAG 執行作業。即使超出儲存空間上限,工作仍會顯示
Success
狀態。
查看工作 Airflow 記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「Airflow 記錄」>「工作人員」>「在記錄檔探索工具中查看」。
依類型篩選記錄:只顯示「錯誤」訊息。
記錄中會顯示類似下列的訊息:
Worker: warm shutdown (Main Process)
或
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
這些記錄表示 Pod 已啟動「暖關機」程序,因為使用的儲存空間超過上限,且在 1 小時內遭到逐出。不過,DAG 執行作業並未失敗,因為作業在 Kubernetes 終止寬限期內完成,本教學課程會進一步說明。
如要說明終止寬限期的概念,請查看第二個範例 DAG long_running_create_large_txt_file_print_logs
的結果。
觸發第二個 DAG long_running_create_large_txt_file_print_logs
:
前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
在 Airflow 網頁介面中,前往「DAGs」頁面,在 DAG 的「Links」資料欄中,按一下「Trigger Dag」按鈕。
按一下「觸發條件」。
在「DAG」頁面中,點選您觸發的工作,然後查看輸出記錄,確認 DAG 已開始執行。
等待
long_running_create_large_txt_file_print_logs
DAG 執行作業失敗。這項程序大約需要一小時。
查看 DAG 執行結果:
在「DAGs」頁面中,按一下
long_running_create_large_txt_file_print_logs
DAG 執行作業。您會看到工作處於Failed
狀態,且執行時間正好是 1 小時 5 分鐘,少於工作的等待時間 1 小時 15 分鐘。查看工作記錄。DAG 在 Airflow 工作站的容器中建立
localfile.txt
檔案後,記錄檔會顯示 DAG 開始等待,且每 1 分鐘會在工作記錄中顯示執行時間。在本例中,DAG 會列印localfile.txt size:
記錄,而localfile.txt
檔案的大小為 1.5 GB。
一旦寫入 Airflow 工作站容器的檔案超過儲存空間上限,DAG 執行作業就會失敗。不過,工作不會立即失敗,而是會持續執行,直到時間達到 1 小時 5 分鐘為止。這是因為 Kubernetes 不會立即終止工作,而是會繼續執行,讓工作有 1 小時的復原時間,也就是「終止寬限期」。節點資源用盡時,Kubernetes 不會立即終止 Pod,而是會妥善處理終止作業,盡量減少對使用者的影響。
終止寬限期可協助使用者在工作失敗後復原檔案,但診斷 DAG 時可能會造成混淆。如果超過 Airflow 工作站的儲存空間限制,最終工作狀態會視 DAG 執行時間長度而定:
如果 DAG 執行作業超出工作站儲存空間限制,但在一小時內完成,則工作會在終止寬限期內完成,因此狀態為
Success
。不過,Kubernetes 會終止 Pod,並立即從容器中刪除寫入的檔案。如果 DAG 超過工作站儲存空間限制,且執行時間超過 1 小時,DAG 會持續執行 1 小時,且儲存空間用量可能會超過限制數千個百分比,之後 Kubernetes 會淘汰 Pod,而 Airflow 會將工作標示為
Failed
。
診斷失敗的 DAG
如果失敗時正在執行多項工作,請考慮只執行一項工作,並診斷該期間的資源壓力,找出造成資源壓力的工作,以及需要增加的資源。
查看第二個 DAG 的工作記錄,
long_running_create_large_txt_file_print_logs
:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「記錄」分頁,然後依序前往「所有記錄」>「Airflow 記錄」>「工作人員」>「在記錄檔探索工具中查看」。
依類型篩選記錄:只顯示「錯誤」訊息。
記錄中會顯示類似下列的訊息:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
或
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
這些訊息表示隨著工作進展,當 DAG 產生的檔案大小超過工作站儲存空間限制時,Airflow 記錄開始列印錯誤,並開始終止寬限期。在終止寬限期內,儲存空間用量未回到上限,因此終止寬限期結束後,Pod 遭到逐出。
查看環境健康狀態和資源用量監控:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「監控」分頁,然後選取「總覽」。
在「環境總覽」面板中,找到「環境健康狀態 (Airflow 監控 DAG)」圖表。其中包含紅色區域,對應於記錄開始列印錯誤的時間。
選取「工作站」,然後找到「工作站磁碟總用量」圖表。請注意,「磁碟用量」線在工作執行時會出現尖峰,並超過「磁碟限制」線。

提高工作人員儲存空間上限
分配額外的 Airflow 工作站儲存空間,確保範例 DAG 成功執行:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「環境設定」分頁。
找出「Resources」>「Workloads」設定,然後按一下「Edit」。
在「工作站」部分的「儲存空間」欄位中,指定 Airflow 工作站的新儲存空間上限。在本教學課程中,請將其設為 2 GB。
儲存變更,並等待幾分鐘,讓 Airflow 工作站重新啟動。
使用新的儲存空間上限測試 DAG
再次觸發 long_running_create_large_txt_file_print_logs
DAG,並等待 1 小時 15 分鐘,直到 DAG 執行完畢。
在 DAG 執行的輸出記錄中,您會看到
Marking task as SUCCESS
, 且工作狀態會顯示「成功」,持續時間為 1 小時 15 分鐘,這等於 DAG 程式碼中設定的等待時間。查看「監控」分頁中的「環境總覽」部分,確認沒有任何紅色區域。
按一下「工作站」部分,然後找到「工作站磁碟總用量」圖表。您會發現「磁碟限制」行反映了儲存空間限制的變更,「磁碟用量」行則在允許的範圍內。
摘要
在本教學課程中,您已診斷 DAG 失敗的原因,並透過偵錯兩個因工作站記憶體和儲存空間不足而失敗的 DAG 範例,找出造成壓力的資源類型。然後為工作人員分配更多記憶體和儲存空間,成功執行 DAG。不過,建議您先最佳化 DAG (工作流程),減少工作站資源耗用量,因為資源無法超過特定門檻。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
刪除個別資源
如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。
刪除 Cloud Composer 環境。您也可以在執行這項程序時刪除環境的 bucket。