Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁面說明如何將 DAG、資料和設定從現有的 Airflow 1.10.* 環境,轉移至 Airflow 2 和後續 Airflow 版本的環境。
其他遷移指南
寄件者 | 收件者 | 方法 | 指南 |
---|---|---|---|
Cloud Composer 2 | Cloud Composer 3 | 並排使用遷移指令碼 | 指令碼遷移指南 |
Cloud Composer 2 | Cloud Composer 3 | 並排比較快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 3 | 並排比較快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 2 | 並排比較快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 2 | 並排比較,手動轉移 | 手動遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並排比較快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並排比較,手動轉移 | 手動遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 1、Airflow 2 | 並排比較,手動轉移 | 本指南 |
並行升級
Cloud Composer 提供 Cloud Composer 資料庫轉移指令碼,可將中繼資料庫、DAG、資料和外掛程式,從 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 環境,遷移至 Airflow 2.0.1 和後續 Airflow 版本的現有 Cloud Composer 環境。
這是本指南所述路徑的替代路徑。使用提供的指令碼時,本指南的部分內容仍適用。舉例來說,您可能想在遷移 DAG 前檢查 DAG 是否與 Airflow 2 相容,或確保不會同時執行多個 DAG,且沒有額外或遺漏的 DAG 執行作業。
事前準備
開始使用 Airflow 2 的 Cloud Composer 環境前,請先瞭解 Airflow 2 為 Cloud Composer 環境帶來的變更。
多個排程器
您可以在環境中使用多個 Airflow 排程器。您可以在建立環境時設定排程器數量,也可以更新現有環境。
Celery+Kubernetes 執行器
Cloud Composer 3 支援 Airflow 2 Celery+Kubernetes Executor。
破壞性變更
Airflow 2 導入了許多重大變更,其中有些是破壞性變更:
- 我們無法保證 Airflow 1.10.* 的現有 DAG 能與 Airflow 2 搭配運作。因此需要測試,並視情況調整。
- 運算子、轉移和掛鉤已遷移至供應商套件。DAG 中的匯入陳述式必須使用新的供應商套件。舊的匯入陳述式可能無法在 Airflow 2 中運作。
- Airflow 2不再支援特定設定選項,因此部分 Airflow 1.10.* 設定可能不再受支援。
- 部分自訂 PyPI 套件可能與新版 Airflow 或 Python 不相容。
- Airflow UI with Access Control 是預設的 Airflow 2 UI。Airflow 2 不支援其他 Airflow UI 類型。
- 實驗性 REST API 已由穩定版 Airflow API 取代。在 Airflow 2 中,實驗性 REST API 預設為停用。
- Airflow 2.0.0 的其他重大變更
- Airflow 2.0.1 的其他重大變更
Airflow 2 和 Airflow 1.10.* 環境的差異
Airflow 1.10.* 版 Cloud Composer 環境與 Airflow 2 版環境的主要差異:
- 使用 Airflow 2 的環境會使用 Python 3.8。這個版本比 Airflow 1.10.* 環境中使用的版本更新。不支援 Python 2、Python 3.6 和 Python 3.7。
- Airflow 2 使用不同的 CLI 格式。在 Airflow 2 環境中,Cloud Composer 支援透過
gcloud composer environments run
指令使用新格式。 - Airflow 2 環境中預先安裝的 PyPI 套件有所不同。如需預先安裝的 PyPI 套件清單,請參閱 Cloud Composer 版本清單。
- Airflow 2 一律會啟用 DAG 序列化功能。因此,您不再需要非同步載入 DAG,Airflow 2 也不支援這項功能。因此,Airflow 2 不支援設定
[core]store_serialized_dags
和[core]store_dag_code
參數,嘗試設定時會回報錯誤。 - 系統不支援 Airflow 網頁伺服器外掛程式。這不會影響排程器或工作站外掛程式,包括 Airflow 運算子和感應器。
- 在 Airflow 2 環境中,預設的 Airflow 使用者角色為
Op
。如果是 Airflow 1.10.* 環境,預設角色為Admin
。
步驟 1:確認是否與 Airflow 2 相容
如要檢查是否可能與 Airflow 2 發生衝突,請參閱「升級至 Airflow 2.0 以上版本」指南的「升級 DAG」一節。
您可能會遇到不相容匯入路徑的常見問題。如要進一步瞭解如何解決這個相容性問題,請參閱「升級至 Airflow 2.0 以上版本」指南的回溯移植供應商相關章節。
步驟 2:建立 Airflow 2 環境,轉移設定覆寫和環境變數
建立 Airflow 2 環境,並轉移設定覆寫和環境變數:
按照建立環境的步驟操作。建立環境前,請先指定設定覆寫和環境變數,詳情請參閱下文。
選取映像檔時,請選擇 Airflow 2 映像檔。
手動將設定參數從 Airflow 1.10.* 環境轉移至新的 Airflow 2 環境。
控制台
建立環境時,請展開「網路、Airflow 設定覆寫與其他功能」部分。
在「Airflow configuration overrides」(Airflow 設定覆寫) 底下,按一下「Add Airflow configuration override」(新增 Airflow 設定覆寫)。
從 Airflow 1.10.* 環境複製所有設定覆寫。
部分設定選項在 Airflow 2 中會使用不同的名稱和區段。詳情請參閱「設定變更」。
在「Environment variables」(環境變數) 下,按一下「Add environment variable」(新增環境變數)。
從 Airflow 1.10.* 環境複製所有環境變數。
按一下「建立」即可建立環境。
步驟 3:將 PyPI 套件安裝到 Airflow 2 環境
建立 Airflow 2 環境後,請在其中安裝 PyPI 套件:
控制台
前往 Google Cloud 控制台的「Environments」頁面。
選取 Airflow 2 環境。
前往「PyPI packages」(PyPI 套件) 分頁,然後按一下「Edit」(編輯)。
從 Airflow 1.10.* 環境複製 PyPI 套件需求條件。 按一下「儲存」,然後等待環境更新。
由於 Airflow 2 環境使用不同的預先安裝套件組合和 Python 版本,您可能會遇到難以解決的 PyPI 套件衝突。
步驟 4:將變數和集區轉移至 Airflow 2
Airflow 1.10.* 支援將變數和集區匯出至 JSON 檔案。然後將這些檔案匯入 Airflow 2 環境。
如果除了 default_pool
以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。
gcloud
從 Airflow 1.10.* 環境匯出變數:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
取代:
AIRFLOW_1_ENV
改為 Airflow 1.10.* 環境的名稱。AIRFLOW_1_LOCATION
改成環境所在的地區。
從 Airflow 1.10.* 環境匯出集區:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
取得 Airflow 2 環境值區 URI。
執行下列指令:
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(config.dagGcsPrefix)"
取代:
AIRFLOW_2_ENV
改為 Airflow 2 環境的名稱。AIRFLOW_2_LOCATION
改成環境所在的地區。
在輸出內容中,移除
/dags
資料夾。結果是 Airflow 2 環境 bucket 的 URI。舉例來說,將
gs://us-central1-example-916807e1-bucket/dags
變更為gs://us-central1-example-916807e1-bucket
。
將含有變數和集區的 JSON 檔案轉移至 Airflow 2 環境:
gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=variables.json gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=pools.json
將
AIRFLOW_2_BUCKET
替換為上一個步驟中取得的 Airflow 2 環境值區 URI。將變數和集區匯入 Airflow 2:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
確認變數和集區已匯入:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables list gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools list
從 bucket 移除 JSON 檔案:
gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
步驟 5:從 Airflow 1.10.* 環境值區轉移其他資料
gcloud
將外掛程式轉移至 Airflow 2 環境。如要這麼做,請將外掛程式從 Airflow 1.10.* 環境 bucket 匯出至 Airflow 2 環境 bucket 中的
/plugins
資料夾:gcloud composer environments storage plugins export \ --destination=AIRFLOW_2_BUCKET/plugins \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
確認
/plugins
資料夾是否已成功匯入:gcloud composer environments storage plugins list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
將 Airflow 1.10.* 環境中的
/data
資料夾匯出至 Airflow 2 環境:gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
確認
/data
資料夾是否已成功匯入:gcloud composer environments storage data list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
步驟 6:轉移連線和使用者
Airflow 1.10.* 不支援匯出使用者和連線。如要轉移使用者和連線,請在 Airflow 2 環境中手動建立新的使用者帳戶和連線。
gcloud
如要取得 Airflow 1.10.* 環境中的連線清單,請執行:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ connections -- --list
如要在 Airflow 2 環境中建立新連線,請透過 gcloud 執行
connections
Airflow CLI 指令。例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
如要查看 Airflow 1.10.* 環境中的使用者清單,請按照下列步驟操作:
依序前往「管理」 >「使用者」。
如要在 Airflow 2 環境中建立新使用者帳戶,請透過 gcloud 執行
users create
Airflow CLI 指令。例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Admin
步驟 7:確認 DAG 已準備好在 Airflow 2 中執行
將 DAG 轉移至 Airflow 2 環境前,請確認下列事項:
DAG 執行作業成功,且沒有其他相容性問題。
DAG 使用正確的匯入陳述式。
舉例來說,
BigQueryCreateDataTransferOperator
的新匯入陳述式可能如下所示:from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
您的 DAG 已升級為 Airflow 2。這項變更與 Airflow 1.10.14 以上版本相容。
步驟 8:將 DAG 轉移至 Airflow 2 環境
在環境之間轉移 DAG 時,可能會發生下列潛在問題:
如果兩個環境都啟用 DAG (未暫停),每個環境都會按照排定的時間執行自己的 DAG 副本。這可能會導致相同資料和執行時間的 DAG 同時執行。
由於 DAG 補追,Airflow 會從 DAG 中指定的開始日期開始,排定額外的 DAG 執行作業。這是因為新的 Airflow 執行個體不會將 1.10.* 環境中的 DAG 執行記錄納入考量。這可能會導致系統從指定開始日期起排定大量 DAG 執行作業。
防止 DAG 同時執行
在 Airflow 2 環境中,覆寫 dags_are_paused_at_creation
Airflow 設定選項。變更後,所有新的 DAG 都會預設暫停。
區段 | 鍵 | 值 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
避免 DAG 執行作業過多或過少
在轉移至 Airflow 2 環境的 DAG 中,指定新的靜態開始日期。
為避免執行日期出現間隔和重疊,下一次排定間隔發生時,應在 Airflow 2 環境中執行第一個 DAG。如要這麼做,請在 DAG 中將新的開始日期設為 Airflow 1.10.* 環境中上次執行的日期之前。
舉例來說,假設您的 DAG 每天會在 Airflow 1.10.* 環境中於 15:00、17:00 和 21:00 執行,而您打算在 15:15 轉移 DAG,則 Airflow 2 環境的開始日期可以是當天 14:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定在 17:00 執行 DAG。
再舉一例,假設您的 DAG 每天會在 Airflow 1.10.* 環境中於 00:00 執行,最後一次執行 DAG 是在 2021 年 4 月 26 日 00:00,且您打算在 2021 年 4 月 26 日 13:00 轉移 DAG,則 Airflow 2 環境的開始日期可以是 2021 年 4 月 25 日 23:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定在 2021 年 4 月 27 日 00:00 執行 DAG。
將 DAG 逐一轉移至 Airflow 2 環境
請針對每個 DAG 按照下列程序進行轉移:
請確認 DAG 中的新開始日期已如上一節所述設定。
將更新後的 DAG 上傳至 Airflow 2 環境。由於設定覆寫,這個 DAG 已在 Airflow 2 環境中暫停,因此目前尚未排定任何 DAG 執行作業。
在 Airflow 網頁介面中,前往「DAGs」,檢查是否有回報的 DAG 語法錯誤。
在您打算轉移 DAG 時:
在 Airflow 1.10.* 環境中暫停 DAG。
在 Airflow 2 環境中取消暫停 DAG。
確認新的 DAG 執行作業是否已排定在正確的時間執行。
等待 Airflow 2 環境執行 DAG,並檢查是否成功。
視 DAG 執行作業是否成功而定:
如果 DAG 執行作業成功,即可繼續操作,並從 Airflow 2 環境使用 DAG。最後,請考慮刪除 Airflow 1.10.* 版本的 DAG。
如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 2 中順利執行為止。
如有需要,您隨時可以還原為 Airflow 1.10.* 版 DAG:
在 Airflow 2 環境中暫停 DAG。
在 Airflow 1.10.* 環境中取消暫停 DAG。系統會為失敗的 DAG 執行作業排定新的執行作業,日期和時間與失敗的執行作業相同。
準備好繼續使用 Airflow 2 版本的 DAG 時,請調整開始日期,將新版 DAG 上傳至 Airflow 2 環境,然後重複上述程序。
步驟 9:監控 Airflow 2 環境
將所有 DAG 和設定轉移至 Airflow 2 環境後,請監控潛在問題、失敗的 DAG 執行作業,以及整體環境健康狀態。如果 Airflow 2 環境順利運作一段時間,即可移除 Airflow 1.10.* 環境。