將環境遷移至 Airflow 2

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer„�

本頁面說明如何將 DAG、資料和設定從現有的 Airflow 1.10.* 環境,轉移至 Airflow 2 和後續 Airflow 版本的環境。

其他遷移指南

寄件者 收件者 方法 指南
Cloud Composer 2 Cloud Composer 3 並排使用遷移指令碼 指令碼遷移指南
Cloud Composer 2 Cloud Composer 3 並排比較快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 3 並排比較快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排比較快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排比較,手動轉移 手動遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排比較快照 快照遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排比較,手動轉移 手動遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 1、Airflow 2 並排比較,手動轉移 本指南

並行升級

Cloud Composer 提供 Cloud Composer 資料庫轉移指令碼,可將中繼資料庫、DAG、資料和外掛程式,從 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 環境,遷移至 Airflow 2.0.1 和後續 Airflow 版本的現有 Cloud Composer 環境。

這是本指南所述路徑的替代路徑。使用提供的指令碼時,本指南的部分內容仍適用。舉例來說,您可能想在遷移 DAG 前檢查 DAG 是否與 Airflow 2 相容,或確保不會同時執行多個 DAG,且沒有額外或遺漏的 DAG 執行作業。

事前準備

開始使用 Airflow 2 的 Cloud Composer 環境前,請先瞭解 Airflow 2 為 Cloud Composer 環境帶來的變更。

多個排程器

您可以在環境中使用多個 Airflow 排程器。您可以在建立環境時設定排程器數量,也可以更新現有環境

Celery+Kubernetes 執行器

Cloud Composer 3 支援 Airflow 2 Celery+Kubernetes Executor

破壞性變更

Airflow 2 導入了許多重大變更,其中有些是破壞性變更:

Airflow 2 和 Airflow 1.10.* 環境的差異

Airflow 1.10.* 版 Cloud Composer 環境與 Airflow 2 版環境的主要差異:

  • 使用 Airflow 2 的環境會使用 Python 3.8。這個版本比 Airflow 1.10.* 環境中使用的版本更新。不支援 Python 2、Python 3.6 和 Python 3.7。
  • Airflow 2 使用不同的 CLI 格式。在 Airflow 2 環境中,Cloud Composer 支援透過 gcloud composer environments run 指令使用新格式。
  • Airflow 2 環境中預先安裝的 PyPI 套件有所不同。如需預先安裝的 PyPI 套件清單,請參閱 Cloud Composer 版本清單
  • Airflow 2 一律會啟用 DAG 序列化功能。因此,您不再需要非同步載入 DAG,Airflow 2 也不支援這項功能。因此,Airflow 2 不支援設定 [core]store_serialized_dags[core]store_dag_code 參數,嘗試設定時會回報錯誤。
  • 系統不支援 Airflow 網頁伺服器外掛程式。這不會影響排程器或工作站外掛程式,包括 Airflow 運算子和感應器。
  • 在 Airflow 2 環境中,預設的 Airflow 使用者角色Op。如果是 Airflow 1.10.* 環境,預設角色為 Admin

步驟 1:確認是否與 Airflow 2 相容

如要檢查是否可能與 Airflow 2 發生衝突,請參閱「升級至 Airflow 2.0 以上版本」指南的「升級 DAG」一節。

您可能會遇到不相容匯入路徑的常見問題。如要進一步瞭解如何解決這個相容性問題,請參閱「升級至 Airflow 2.0 以上版本」指南的回溯移植供應商相關章節

步驟 2:建立 Airflow 2 環境,轉移設定覆寫和環境變數

建立 Airflow 2 環境,並轉移設定覆寫和環境變數:

  1. 按照建立環境的步驟操作。建立環境前,請先指定設定覆寫和環境變數,詳情請參閱下文。

  2. 選取映像檔時,請選擇 Airflow 2 映像檔

  3. 手動將設定參數從 Airflow 1.10.* 環境轉移至新的 Airflow 2 環境。

    控制台

    1. 建立環境時,請展開「網路、Airflow 設定覆寫與其他功能」部分。

    2. 在「Airflow configuration overrides」(Airflow 設定覆寫) 底下,按一下「Add Airflow configuration override」(新增 Airflow 設定覆寫)

    3. 從 Airflow 1.10.* 環境複製所有設定覆寫。

      部分設定選項在 Airflow 2 中會使用不同的名稱和區段。詳情請參閱「設定變更」。

    4. 在「Environment variables」(環境變數) 下,按一下「Add environment variable」(新增環境變數)

    5. 從 Airflow 1.10.* 環境複製所有環境變數。

    6. 按一下「建立」即可建立環境。

步驟 3:將 PyPI 套件安裝到 Airflow 2 環境

建立 Airflow 2 環境後,請在其中安裝 PyPI 套件:

控制台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 選取 Airflow 2 環境。

  3. 前往「PyPI packages」(PyPI 套件) 分頁,然後按一下「Edit」(編輯)

  4. 從 Airflow 1.10.* 環境複製 PyPI 套件需求條件。 按一下「儲存」,然後等待環境更新。

    由於 Airflow 2 環境使用不同的預先安裝套件組合和 Python 版本,您可能會遇到難以解決的 PyPI 套件衝突。

步驟 4:將變數和集區轉移至 Airflow 2

Airflow 1.10.* 支援將變數和集區匯出至 JSON 檔案。然後將這些檔案匯入 Airflow 2 環境。

如果除了 default_pool 以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。

gcloud

  1. 從 Airflow 1.10.* 環境匯出變數:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    取代:

    • AIRFLOW_1_ENV 改為 Airflow 1.10.* 環境的名稱。
    • AIRFLOW_1_LOCATION 改成環境所在的地區。
  2. 從 Airflow 1.10.* 環境匯出集區:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. 取得 Airflow 2 環境值區 URI。

    1. 執行下列指令:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      取代:

      • AIRFLOW_2_ENV 改為 Airflow 2 環境的名稱。
      • AIRFLOW_2_LOCATION 改成環境所在的地區。
    2. 在輸出內容中,移除 /dags 資料夾。結果是 Airflow 2 環境 bucket 的 URI。

      舉例來說,將 gs://us-central1-example-916807e1-bucket/dags 變更為 gs://us-central1-example-916807e1-bucket

  4. 將含有變數和集區的 JSON 檔案轉移至 Airflow 2 環境:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET 替換為上一個步驟中取得的 Airflow 2 環境值區 URI。

  5. 將變數和集區匯入 Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 確認變數和集區已匯入:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 從 bucket 移除 JSON 檔案:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

步驟 5:從 Airflow 1.10.* 環境值區轉移其他資料

gcloud

  1. 外掛程式轉移至 Airflow 2 環境。如要這麼做,請將外掛程式從 Airflow 1.10.* 環境 bucket 匯出至 Airflow 2 環境 bucket 中的 /plugins 資料夾:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. 確認 /plugins 資料夾是否已成功匯入:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. 將 Airflow 1.10.* 環境中的 /data 資料夾匯出至 Airflow 2 環境:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. 確認 /data 資料夾是否已成功匯入:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

步驟 6:轉移連線和使用者

Airflow 1.10.* 不支援匯出使用者和連線。如要轉移使用者和連線,請在 Airflow 2 環境中手動建立新的使用者帳戶和連線。

gcloud

  1. 如要取得 Airflow 1.10.* 環境中的連線清單,請執行:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. 如要在 Airflow 2 環境中建立新連線,請透過 gcloud 執行 connections Airflow CLI 指令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. 如要查看 Airflow 1.10.* 環境中的使用者清單,請按照下列步驟操作:

    1. 開啟 Airflow 1.10.* 環境的 Airflow 網頁介面

    2. 依序前往「管理」 >「使用者」

  4. 如要在 Airflow 2 環境中建立新使用者帳戶,請透過 gcloud 執行 users create Airflow CLI 指令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

步驟 7:確認 DAG 已準備好在 Airflow 2 中執行

將 DAG 轉移至 Airflow 2 環境前,請確認下列事項:

  1. DAG 執行作業成功,且沒有其他相容性問題。

  2. DAG 使用正確的匯入陳述式

    舉例來說,BigQueryCreateDataTransferOperator 的新匯入陳述式可能如下所示:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已升級為 Airflow 2。這項變更與 Airflow 1.10.14 以上版本相容。

步驟 8:將 DAG 轉移至 Airflow 2 環境

在環境之間轉移 DAG 時,可能會發生下列潛在問題:

  • 如果兩個環境都啟用 DAG (未暫停),每個環境都會按照排定的時間執行自己的 DAG 副本。這可能會導致相同資料和執行時間的 DAG 同時執行。

  • 由於 DAG 補追,Airflow 會從 DAG 中指定的開始日期開始,排定額外的 DAG 執行作業。這是因為新的 Airflow 執行個體不會將 1.10.* 環境中的 DAG 執行記錄納入考量。這可能會導致系統從指定開始日期起排定大量 DAG 執行作業。

防止 DAG 同時執行

在 Airflow 2 環境中,覆寫 dags_are_paused_at_creation Airflow 設定選項。變更後,所有新的 DAG 都會預設暫停。

區段
core dags_are_paused_at_creation True

避免 DAG 執行作業過多或過少

在轉移至 Airflow 2 環境的 DAG 中,指定新的靜態開始日期

為避免執行日期出現間隔和重疊,下一次排定間隔發生時,應在 Airflow 2 環境中執行第一個 DAG。如要這麼做,請在 DAG 中將新的開始日期設為 Airflow 1.10.* 環境中上次執行的日期之前。

舉例來說,假設您的 DAG 每天會在 Airflow 1.10.* 環境中於 15:00、17:00 和 21:00 執行,而您打算在 15:15 轉移 DAG,則 Airflow 2 環境的開始日期可以是當天 14:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定在 17:00 執行 DAG。

再舉一例,假設您的 DAG 每天會在 Airflow 1.10.* 環境中於 00:00 執行,最後一次執行 DAG 是在 2021 年 4 月 26 日 00:00,且您打算在 2021 年 4 月 26 日 13:00 轉移 DAG,則 Airflow 2 環境的開始日期可以是 2021 年 4 月 25 日 23:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定在 2021 年 4 月 27 日 00:00 執行 DAG。

將 DAG 逐一轉移至 Airflow 2 環境

請針對每個 DAG 按照下列程序進行轉移:

  1. 請確認 DAG 中的新開始日期已如上一節所述設定。

  2. 將更新後的 DAG 上傳至 Airflow 2 環境。由於設定覆寫,這個 DAG 已在 Airflow 2 環境中暫停,因此目前尚未排定任何 DAG 執行作業。

  3. Airflow 網頁介面中,前往「DAGs」,檢查是否有回報的 DAG 語法錯誤。

  4. 在您打算轉移 DAG 時:

    1. 在 Airflow 1.10.* 環境中暫停 DAG。

    2. 在 Airflow 2 環境中取消暫停 DAG。

    3. 確認新的 DAG 執行作業是否已排定在正確的時間執行。

    4. 等待 Airflow 2 環境執行 DAG,並檢查是否成功。

  5. 視 DAG 執行作業是否成功而定:

    • 如果 DAG 執行作業成功,即可繼續操作,並從 Airflow 2 環境使用 DAG。最後,請考慮刪除 Airflow 1.10.* 版本的 DAG。

    • 如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 2 中順利執行為止。

      如有需要,您隨時可以還原為 Airflow 1.10.* 版 DAG:

      1. 在 Airflow 2 環境中暫停 DAG。

      2. 在 Airflow 1.10.* 環境中取消暫停 DAG。系統會為失敗的 DAG 執行作業排定新的執行作業,日期和時間與失敗的執行作業相同。

      3. 準備好繼續使用 Airflow 2 版本的 DAG 時,請調整開始日期,將新版 DAG 上傳至 Airflow 2 環境,然後重複上述程序。

步驟 9:監控 Airflow 2 環境

將所有 DAG 和設定轉移至 Airflow 2 環境後,請監控潛在問題、失敗的 DAG 執行作業,以及整體環境健康狀態。如果 Airflow 2 環境順利運作一段時間,即可移除 Airflow 1.10.* 環境。

後續步驟