Airflow 2 に環境を移行する

Cloud Composer 1 | Cloud Composer 2

このページでは、DAG、データ、構成を既存の Airflow 1.10.* 環境から、Airflow 2 以降の Airflow バージョンを使用する環境に転送する方法について説明します。

その他の移行ガイド

移行元 終了日 メソッド ガイド
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 移行ガイド(スナップショット)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 移行ガイド(スナップショット)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、手動転送 手動移行ガイド
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、手動転送 手動移行ガイド
Airflow 1 Airflow 2 並列処理、手動転送 このガイド(手動移行)

移し換えアップグレード

Cloud Composer には、Airflow 1.10.14 および Airflow 1.10.15 を搭載した Cloud Composer 環境から、Airflow 2.0.1 以降の Airflow バージョンを搭載した既存の Cloud Composer 環境に、メタデータ・データベース、DAG、データ、プラグインを移行するための Cloud Composer データベース転送スクリプトが用意されています。

これは、このガイドで説明する方法に代わるものです。このガイドの一部は、用意されたスクリプトを使用する場合も引き続き適用されます。たとえば、Airflow 2 と DAG の互換性の確認を移行前に行うか、DAG の同時実行が発生しないようにし、余分な DAG の実行や DAG 実行の不足も起きないようにします。

始める前に

Airflow 2 とともに Cloud Composer 環境の使用を開始する前に、Airflow 2 が Cloud Composer 環境にもたらす変化について検討してください。

スケジューラ HA

環境内では、複数の Airflow スケジューラを使用できます。スケジューラの数は、環境を作成するときや、既存の環境を更新するときに設定できます。

Celery + Kubernetes Executor

Airflow 2 の Celery+Kubernetes Executor は現在のバージョンの Cloud Composer ではサポートされていません。

重要な変更

Airflow 2 では多くの大きな変更が導入され、その一部は最新機能です。

  • Airflow 1.10.* の既存の DAG が Airflow 2 で機能するとは限りません。テストを行い、場合によっては調整する必要があります。
  • プロバイダ パッケージに移行されるオペレーター、転送、フック。DAG のインポート ステートメントでは、新しいプロバイダ パッケージを使用する必要があります。古いインポート ステートメントは Airflow 2 で機能しなくなる可能性があります。
  • Airflow 2.では特定の構成オプションがサポートされないため、一部の Airflow 1.10.* 構成はサポートされなくなる可能性があります。
  • 一部のカスタム PyPI パッケージには、Airflow または Python の新しいバージョンとの互換性がない可能性があります。
  • アクセス制御のある Airflow UI が、デフォルトの Airflow 2 UI です。Airflow 2 では他の Airflow UI タイプをサポートしていません。
  • 試験運用版の REST API は安定している Airflow API に置き換えられました。試験運用版の REST API は Airflow 2 ではデフォルトで無効になっています。
  • Airflow 2.0.0 におけるその他の重要な変更
  • Airflow 2.0.1 におけるその他の重要な変更

Airflow 2 と Airflow 1.10.* の環境の違い

Airflow 1.10.* を使用する Cloud Composer 環境と Airflow 2 を使用する環境との主な違い

  • Airflow 2 を使用する環境では、Python 3.8 を使用します。これは、Airflow 1.10.* 環境で使用されているバージョンより新しいバージョンです。Python 2、Python 3.6、Python 3.7 はサポートされていません。
  • Airflow 2 は異なる CLI 形式を使用します。Cloud Composer では、gcloud composer environments run コマンドを通じて、Airflow 2 を使用する環境で新しい形式をサポートします。
  • プリインストールされた PyPI パッケージは、Airflow 2 環境で異なります。プリインストールされた PyPI パッケージの一覧については、Cloud Composer のバージョン リストをご覧ください。
  • Airflow 2 では、常に DAG のシリアル化が常に有効です。その結果、非同期 DAG 読み込みは不要になり、Airflow 2 ではサポートされません。 そのため、Airflow 2 では [core]store_serialized_dags パラメータと [core]store_dag_code パラメータの構成がサポートされておらず、このパラメータを設定しようとするとエラーとして報告されます。
  • Airflow ウェブサーバー プラグインはサポートされていません。これは、Airflow オペレーターやセンサーを含む、スケジューラやワーカーのプラグインには影響しません。
  • Airflow 2 環境では、デフォルトの Airflow ユーザーロールOp です。Airflow 1.10.* を使用する環境では、デフォルトのロールは Admin です。

ステップ 1: Airflow 2 との互換性を確認する

Airflow 2 との競合の可能性を確認するには、既存の Airflow 1.10.* 環境で Airflow が提供するアップグレード チェック スクリプトを使用します。

gcloud

  1. Airflow 1.10.14 以前のバージョンを環境で使用している場合は、Airflow 1.10.15 以降を使用する Cloud Composer のバージョン環境をアップグレードします。Cloud Composer は、Airflow 1.10.15 以降のアップグレード チェック コマンドをサポートしています。

  2. gcloud composer environments run コマンドを使用してアップグレード チェックを実行します。スタンドアロンの Airflow 1.10.15 に関連する一部のアップグレード チェックは Cloud Composer とは関係ありません。次のコマンドは、これらのチェックを除外します。

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    以下のように置き換えます。

    • AIRFLOW_1_ENV は、Airflow 1.10.* 環境の名前に置き換えます。
    • AIRFLOW_1_LOCATION は、環境が配置されているリージョン。
  3. コマンドの出力を確認します。更新チェック スクリプトは、既存の環境での潜在的な互換性の問題を報告します。

  4. Airflow 2.0+ のアップグレード ガイドの DAG のアップグレードについてのセクションで説明されているように、DAG に対するその他の変更を実装します。

ステップ 2: Airflow 2 環境を作成し、構成のオーバーライドと環境変数を転送する

Airflow 2 環境を作成し、構成のオーバーライドと環境変数を転送する

  1. 環境の作成の手順に従います。環境を作成する前に、詳細で説明するように、構成のオーバーライドと環境変数も指定します。

  2. イメージを選択するときに、Airflow 2 を使用するイメージを選択します。

  3. 手動で Airflow 1.10.* 環境から新しい Airflow 2 環境へ構成パラメータを転送します。

    Console

    1. 環境を作成するときに、ネットワーキング、Airflow 構成のオーバーライド、その他の機能セクションを展開します。

    2. [Airflow 構成のオーバーライド] で、[Airflow 構成のオーバーライドを追加] をクリックします。

    3. Airflow 1.10.* 環境からすべての構成オーバーライドをコピーします。

      一部の構成オプションでは、Airflow 2 で別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。

    4. [環境変数] で、[環境変数を追加] をクリックします。

    5. Airflow 1.10.* 環境からすべての環境変数をコピーします。

    6. [作成] をクリックして環境を作成します。

ステップ 3: Airflow 2 環境に PyPI パッケージをインストールする

Airflow 2 環境を作成したら、PyPI パッケージをインストールします。

Console

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. Airflow 2 環境を選択します。

  3. [PyPI パッケージ] タブに移動し、[編集] をクリックします。

  4. Airflow 1.10.* 環境から PyPI パッケージの要件をコピーします。[保存] をクリックして、環境が更新されるまで待ちます。

    Airflow 2 環境では、プリインストールされたパッケージの別のセットと別の Python バージョンを使用するため、解決が困難な PyPI パッケージの競合が発生することがあります。パッケージの依存関係の問題を診断する方法の 1 つは、Airflow ワーカー Pod にパッケージをインストールして PyPI パッケージ エラーを確認することです。

ステップ 4: 変数とプールを Airflow 2 に転送する

Airflow 1.10.* では、変数とプールを JSON ファイルにエクスポートできます。これらのファイルは Airflow 2 にインポートできます。

default_pool 以外のカスタムプールがある場合にのみ、プールを移行する必要があります。それ以外の場合は、プールのエクスポートとインポートを行うコマンドをスキップします。

gcloud

  1. Airflow 1.10.* 環境から変数をエクスポートします。

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    以下のように置き換えます。

    • AIRFLOW_1_ENV は、Airflow 1.10.* 環境の名前に置き換えます。
    • AIRFLOW_1_LOCATION は、環境が配置されているリージョン。
  2. Airflow 1.10.* 環境からプールをエクスポートします。

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Airflow 2 環境のバケット URI を取得します。

    1. 次のコマンドを実行します。

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      以下のように置き換えます。

      • AIRFLOW_2_ENV は、Airflow 2 環境の名前に置き換えます。
      • AIRFLOW_2_LOCATION は、環境が配置されているリージョン。
    2. 出力で、/dags フォルダを削除します。結果は Airflow 2 環境バケットの URI です。

      たとえば、gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucket に変更します。

  4. 変数とプールを含む JSON ファイルを Airflow 2 環境に転送します。

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET は、前のステップで取得した Airflow 2 環境バケットの URI に置き換えます。

  5. 変数とプールを Airflow 2 にインポートします。

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 変数とプールがインポートされたことを確認します。

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. バケットから JSON ファイルを削除します。

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

ステップ 5: Airflow 1.10.* 環境バケットから他のデータを転送する

gcloud

  1. プラグインを Airflow 2 環境に転送します。これを行うには、プラグインを Airflow 1.10.* 環境バケットから Airflow 2 環境バケットの /plugins フォルダにエクスポートします。

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. /plugins フォルダが正常にインポートされたことを確認します。

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Airflow 1.10.* 環境から Airflow 2 環境に /data フォルダをエクスポートします。

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. /data フォルダが正常にインポートされたことを確認します。

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

ステップ 6: 接続とユーザーを転送する

Airflow 1.10.* ではユーザーのエクスポートと接続がサポートされません。ユーザーと接続を転送するには、Airflow 2 環境で新しいユーザー アカウントと接続を手動で作成します。

gcloud

  1. Airflow 1.10.* 環境の接続を一覧表示するには、次のコマンドを実行します。

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Airflow 2 環境で新しい接続を作成するには、gcloud で connections Airflow CLI コマンドを実行します。次に例を示します。

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Airflow 1.10.* 環境内のユーザーの一覧を表示するには:

    1. Airflow 1.10.* 環境の Airflow ウェブ インターフェースを開きます

    2. [管理] > [ユーザー] に移動します。

  4. Airflow 2 環境で新しいユーザー アカウントを作成するには、gcloud で users create Airflow CLI コマンドを実行します。次に例を示します。

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

ステップ 7: DAG が Airflow 2 に対応する準備ができていることを確認する

DAG を Airflow 2 環境に転送する前に、次のことを確認してください。

  1. DAG に対するアップグレード チェック スクリプトが正常に実行され、互換性の問題が残っていない。

  2. DAG で正しいインポート ステートメントを使用している。

    たとえば、BigQueryCreateDataTransferOperator の新しいインポート ステートメントは次のようになります。

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG が Airflow 2 用にアップグレードされます。この変更は、Airflow 1.10.14 以降のバージョンと互換性があります。

ステップ 8: DAG を Airflow 2 環境に転送する

DAG を環境間で転送すると、次の問題が発生する場合があります。

  • 両方の環境で DAG が有効である(一時停止されていない)場合、各環境はスケジュールされたとおりに DAG の独自のコピーを実行します。これにより、同じデータと実行時間に対して同時 DAG 実行が発生する可能性があります。

  • DAG のキャッチアップのために、Airflow は DAG で指定された開始日に始まる追加の DAG 実行をスケジュールします。これは、新しい Airflow インスタンスで 1.10.* 環境からの DAG の実行履歴が考慮されていないためです。このため、指定した開始日に始まるように大量の DAG 実行がスケジュールされる可能性があります。

同時 DAG 実行を防止する

Airflow 2 環境では、Airflow 構成オプション dags_are_paused_at_creation をオーバーライドします。この変更を行うと、デフォルトで新しいすべての DAG が一時停止されます。

セクション キー
core dags_are_paused_at_creation True

DAG 実行の追加や欠落を防ぐ

Airflow 2 環境に転送する DAG で新しい静的開始日を指定します。

実行日のギャップと重複を避けるため、最初の DAG 実行はスケジュール間隔の次の発生時に Airflow 2 環境で行う必要があります。そのためには、DAG 内の新しい開始日を、Airflow 1.10.* 環境での最後の実行日よりも前の日付に設定します。

たとえば、Airflow 1.10 環境で DAG が毎日 15:00、17:00、21:00 に実行され、最後の DAG 実行が 15:00 に発生し、DAG を 15:15 に転送するように計画する場合、Airflow 2 環境の開始日を今日の 14:45 に設定できます。Airflow 2 環境で DAG を有効にすると、DAG の実行が 17:00 にスケジュール設定されます。

別の例として、Airflow 1.10.* 環境で DAG が毎日 00:00 に実行され、最後の DAG 実行が 2021 年 4 月 26 日の 00:00 に発生し、DAG を 2021 年 4 月 26 日の 13:00 に転送するように計画する場合、Airflow 2 環境の開始日を 2021 年 4 月 25 日の 23:45 に設定できます。Airflow 2 環境で DAG を有効にすると、DAG の実行が 2021 年 4 月 27 日の 00:00 にスケジュール設定されます。

DAG を 1 つずつ Airflow 2 環境に転送する

DAG ごとに、次の手順に従って転送します。

  1. DAG の新しい開始日が前のセクションの説明に従って設定されていることを確認します。

  2. Airflow 2 環境に更新された DAG をアップロードします。この DAG は、構成のオーバーライドのために Airflow 2 環境で一時停止されているため、DAG 実行はまだスケジュールされていません。

  3. Airflow ウェブ インターフェースで [DAG] に移動し、報告された DAG 構文エラーを確認します。

  4. DAG の転送を計画する時刻:

    1. Airflow 1.10.* 環境で DAG を一時停止します。

    2. Airflow 2 環境で DAG の一時停止を解除します。

    3. 新しい DAG 実行が正しい時間にスケジュール設定されていることを確認します。

    4. Airflow 2 環境で DAG の実行が発生するまで待機し、実行が成功したかどうかを確認します。

  5. DAG の実行が成功するかどうかによって、以下にようになります。

    • DAG の実行が成功した場合、続行して、Airflow 2 環境から DAG を使用できます。最終的には、Airflow 1.10.* バージョンの DAG を削除することを検討してください。

    • DAG 実行が失敗した場合は、Airflow 2 で正常に実行されるまで DAG のトラブルシューティングを試行してください。

      必要に応じて、いつでも Airflow 1.10.* バージョンの DAG にフォールバックできます。

      1. Airflow 2 環境で DAG を一時停止します。

      2. Airflow 1.10.* 環境で DAG の一時停止を解除します。これにより、新しい DAG 実行が、失敗した DAG 実行と同じ日時にスケジュール設定されます。

      3. Airflow 2 バージョンの DAG の使用を続行する準備ができたら、開始日を調整し、新しいバージョンの DAG を Airflow 2 環境にアップロードして、手順を繰り返します。

ステップ 9: Airflow 2 環境をモニタリングする

すべての DAG と構成を Airflow 2 環境に転送した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。Airflow 2 環境が一定期間問題のなく動作している場合は、Airflow 1.10.* 環境を削除できます。

次のステップ