Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明 Airflow 的排程和 DAG 觸發程序運作方式、如何定義 DAG 的排程,以及如何手動觸發或暫停 DAG。
關於 Cloud Composer 中的 Airflow DAG
Cloud Composer 中的 Airflow DAG 會在專案的一或多個 Cloud Composer 環境中執行。您需要將 Airflow DAG 的來源檔案上傳到與環境相關聯的 Cloud Storage bucket。環境的 Airflow 執行個體會剖析這些檔案,並根據每個 DAG 的排程定義,排定 DAG 執行作業。在 DAG 執行期間,Airflow 會排定並執行 DAG 中定義的個別工作。
如要進一步瞭解 Airflow 的核心概念,例如 Airflow DAG、DAG 執行作業、工作或運算子,請參閱 Airflow 說明文件中的「核心概念」頁面。
關於 Airflow 中的 DAG 排程
Airflow 提供下列概念,用於排程機制:
- 邏輯日期
代表執行特定 DAG 執行的日期。
這不是 Airflow 執行 DAG 的實際日期,而是特定 DAG 執行作業必須處理的時間範圍。舉例來說,如果 DAG 排定在每天 12:00 執行,則特定日期的邏輯日期也會是 12:00。由於每天執行兩次,因此必須處理過去 12 小時的資料。同時,DAG 本身定義的邏輯可能完全不會使用邏輯日期或時間間隔。舉例來說,DAG 可能每天執行一次相同指令碼,但不會使用邏輯日期的值。
在 2.2 之前的 Airflow 版本中,這個日期稱為「執行日期」。
- 執行日期
代表特定 DAG 執行作業的執行日期。
舉例來說,如果 DAG 排定在每天 12:00 執行,DAG 實際執行時間可能會是 12:05,也就是邏輯日期通過後的一段時間。
- 排程間隔
代表 DAG 必須執行的時間和頻率 (以邏輯日期為準)。
舉例來說,如果排定每日執行,DAG 每天會執行一次,且 DAG 執行的邏輯日期間隔為 24 小時。
- 開始日期
指定 Airflow 開始排定 DAG 時間。
DAG 中的工作可以有各自的開始日期,也可以為所有工作指定單一開始日期。Airflow 會根據 DAG 中任務的最早開始日期和排程間隔,排定 DAG 執行作業。
- 補追、回填和重試
執行過去日期的 DAG 執行作業的機制。
補追功能會執行尚未執行的 DAG 執行作業,例如 DAG 暫停很長一段時間後取消暫停。您可以使用回填功能,針對特定日期範圍執行 DAG 執行作業。重試次數:指定 Airflow 執行 DAG 中的工作時,必須嘗試的次數。
排程的運作方式如下:
開始日期過後,Airflow 會等待下一個排程間隔。
Airflow 會排定第一個 DAG 執行作業,在排程間隔結束時執行。
舉例來說,如果 DAG 排定每小時執行一次,且開始日期為今天 12:00,則第一次 DAG 執行時間為今天 13:00。
本文的「排定 Airflow DAG 的時間」一節說明如何運用這些概念,為 DAG 設定排程。如要進一步瞭解 DAG 執行和排程,請參閱 Airflow 說明文件中的「DAG Runs」。
關於觸發 DAG 的方式
Airflow 提供下列 DAG 觸發方式:
依排程觸發。Airflow 會根據 DAG 檔案中指定的排程,自動觸發 DAG。
手動觸發。您可以從Google Cloud 主控台、Airflow UI 手動觸發 DAG,也可以從 Google Cloud CLI 執行 Airflow CLI 指令。
在發生事件時觸發。觸發 DAG 的標準方式是使用感應器,以回應事件。
其他觸發 DAG 的方式:
以程式輔助方式觸發。您可以使用 Airflow REST API 觸發 DAG。例如從 Python 指令碼。
以程式輔助方式觸發,回應事件。您可以使用 Cloud Run functions 和 Airflow REST API,在發生事件時觸發 DAG。
事前準備
- 確認您的帳戶具備可管理環境 bucket 中的物件,以及查看和觸發 DAG 的角色。詳情請參閱存取權控管一文。
排定 Airflow DAG
您可以在 DAG 檔案中定義 DAG 的排程。以以下方式編輯 DAG 的定義:
在電腦上找出並編輯 DAG 檔案。如果沒有 DAG 檔案,可以從環境的 bucket 下載副本。如果是新的 DAG,您可以在建立 DAG 檔案時定義所有參數。
在
schedule_interval
參數中定義排程。您可以使用 Cron 運算式 (例如0 0 * * *
),或預設值 (例如@daily
)。詳情請參閱 Airflow 說明文件中的「Cron and Time Intervals」(Cron 和時間間隔)。Airflow 會根據您設定的排程,決定 DAG 執行作業的邏輯日期。
在
start_date
參數中定義開始日期。Airflow 會使用這項參數,判斷第一個 DAG 執行的邏輯日期。
(選用) 在
catchup
參數中,定義 Airflow 是否必須執行這個 DAG 從開始日期到目前日期之間的所有先前執行作業 (尚未執行的作業)。在補追期間執行的 DAG 執行作業,其邏輯日期會是過去的日期,而執行日期則會反映 DAG 執行作業實際執行的時間。
(選用) 在
retries
參數中,定義 Airflow 必須重試失敗工作的次數 (每個 DAG 包含一或多個個別工作)。根據預設,Cloud Composer 中的工作會重試兩次。將新版 DAG 上傳至環境的 bucket。
等待 Airflow 成功剖析 DAG。舉例來說,您可以在Google Cloud 控制台或 Airflow UI 中,查看環境中的 DAG 清單。
下列 DAG 定義每天會在 00:00 和 12:00 執行兩次。開始日期設為 2024 年 1 月 1 日,但由於補追功能已停用,因此您上傳或暫停 DAG 後,Airflow 不會針對過去的日期執行 DAG。
DAG 包含一個名為 insert_query_job
的工作,可使用 BigQueryInsertJobOperator
運算子將資料列插入資料表。這個運算子是 Google Cloud BigQuery 運算子之一,可用於管理資料集和資料表、執行查詢及驗證資料。如果這項工作的特定執行作業失敗,Airflow 會再重試四次,並使用預設的重試間隔。這些重試的邏輯日期維持不變。
這個資料列的 SQL 查詢會使用 Airflow 範本,將 DAG 的邏輯日期和名稱寫入資料列。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
更多排程參數範例
下列排程參數範例說明排程如何搭配不同參數組合運作:
如果
start_date
為datetime(2024, 4, 4, 16, 25)
,而schedule_interval
為30 16 * * *
,則第一個 DAG 執行作業會在 2024 年 4 月 5 日下午 4:30 執行。如果
start_date
為datetime(2024, 4, 4, 16, 35)
,且schedule_interval
為30 16 * * *
,則第一個 DAG 執行作業會在 2024 年 4 月 6 日下午 4:30 執行。由於開始日期晚於 2024 年 4 月 4 日的排程間隔,因此 DAG 執行作業不會在 2024 年 4 月 5 日發生。因此,排程間隔會在 2024 年 4 月 5 日 16:35 結束,下一次 DAG 執行作業則會排定在隔天 16:30。如果
start_date
為datetime(2024, 4, 4)
,且schedule_interval
為@daily
,則第一個 DAG 執行作業的排定時間為 2024 年 4 月 5 日 00:00。如果
start_date
是datetime(2024, 4, 4, 16, 30)
,且schedule_interval
是0 * * * *
,則系統會將第一個 DAG 執行作業排定在 2024 年 4 月 4 日 18:00 執行。指定日期和時間過後,Airflow 會排定 DAG 執行作業,在每小時的第 0 分鐘執行。最接近的時間點是 17:00。此時,Airflow 會排定 DAG 執行作業,在排程間隔結束時 (也就是 18:00) 執行。
手動觸發 DAG
手動觸發 Airflow DAG 時,Airflow 會執行一次 DAG,不受 DAG 檔案中指定排程的影響。
主控台
如要從 Google Cloud 控制台觸發 DAG,請按照下列步驟操作:
前往 Google Cloud 控制台的「Environments」頁面。
選取環境即可查看詳細資料。
在「環境詳細資料」頁面中,前往「DAG」分頁。
按一下 DAG 名稱。
在「DAG details」(DAG 詳細資料) 頁面中,按一下「Trigger DAG」(觸發 DAG)。系統會建立新的 DAG 執行作業。
Airflow UI
如要透過 Airflow UI 觸發 DAG,請按照下列步驟操作:
前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
登入具有適當權限的 Google 帳戶。
在 Airflow 網頁介面的「DAGs」頁面中,按一下 DAG「Actions」欄中的「Trigger DAG」按鈕。
gcloud
執行 dags trigger
Airflow CLI 指令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。DAG_ID
:DAG 的名稱。
如要進一步瞭解如何在 Cloud Composer 環境中執行 Airflow CLI 指令,請參閱「執行 Airflow CLI 指令」。
如要進一步瞭解可用的 Airflow CLI 指令,請參閱 gcloud composer environments run
指令參考資料。
查看 DAG 執行記錄檔和詳細資料
在 Google Cloud 控制台中,您可以:
- 查看過去的 DAG 執行作業狀態和 DAG 詳細資料。
- 查看詳細記錄,瞭解這些 DAG 的所有 DAG 執行作業和所有工作。
- 查看 DAG 統計資料。
此外,Cloud Composer 也提供 Airflow UI 的存取權,這是 Airflow 專屬的網頁介面。
會合併專案中所有 Cloud Composer 3 環境的所有 Airflow DAG 排程資訊,以及 BigQuery 中提供的其他類型資產。暫停 DAG
主控台
如要從 Google Cloud 控制台暫停 DAG,請按照下列步驟操作:
前往 Google Cloud 控制台的「Environments」頁面。
選取環境即可查看詳細資料。
在「環境詳細資料」頁面中,前往「DAG」分頁。
按一下 DAG 名稱。
在「DAG details」(DAG 詳細資料) 頁面中,按一下「Pause DAG」(暫停 DAG)。
Airflow UI
如要透過 Airflow UI 暫停 DAG,請按照下列步驟操作:
- 前往 Google Cloud 控制台的「Environments」頁面。
在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。
登入具有適當權限的 Google 帳戶。
在 Airflow 網頁介面的「DAGs」頁面中,按一下 DAG 名稱旁的切換按鈕。
gcloud
執行 dags pause
Airflow CLI 指令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。DAG_ID
:DAG 的名稱。
如要進一步瞭解如何在 Cloud Composer 環境中執行 Airflow CLI 指令,請參閱「執行 Airflow CLI 指令」。
如要進一步瞭解可用的 Airflow CLI 指令,請參閱 gcloud composer environments run
指令參考資料。