環境を Cloud Composer 2 に移行する(Airflow 1 から)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、DAG と既存の Cloud Composer 1 のデータと構成の Airflow 1 環境を、Cloud Composer 2(Airflow 2)に移行する方法について説明します。

その他の移行ガイド

開始日 終了日 メソッド ガイド
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 移行ガイド(スナップショット)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 移行ガイド(スナップショット)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、手動転送 手動移行ガイド
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、手動転送 このガイド(手動移行)
Airflow 1 Airflow 2 並列処理、手動転送 手動移行ガイド

始める前に

  • Cloud Composer では、Cloud Composer 1 から Cloud Composer 2 への移し換え移行がサポートされています。Cloud Composer 1 から Cloud Composer 2 への置き換えアップグレードはできません。
  • Cloud Composer 1 と Cloud Composer 2 の相違点の一覧を確認してください。
  • Cloud Composer 2 では Airflow 2 が使用されるため、移行には DAG と環境構成の Airflow 2 への切り替えが含まれます。Cloud Composer における Airflow 1 から Airflow 2 への互換性を失う変更については、Airflow 1 から Airflow 2 への移行ガイドをご覧ください。

  • このガイドは、Airflow 2 への移行と Cloud Composer 2 への移行を組み合わせて 1 つの移行手順にしています。これにより、Cloud Composer 2 に移行する前に、Airflow 2 がある Cloud Composer 1 環境に移行する必要はありません。

ステップ 1: Airflow 1.10.15 にアップグレードする

環境で 1.10.15 より前の Airflow バージョンを使用している場合は、Airflow 1.10.15 を使用する Cloud Composer のバージョン環境をアップグレードします。

ステップ 2: Airflow 2 との互換性を確認する

Airflow 2 との競合の可能性を確認するには、Airflow 2.0+ へのアップグレード ガイドの DAG のアップグレードに関するセクションをご覧ください。

発生する可能性のある一般的な問題の 1 つは、互換性のないインポート パスに関するものです。この互換性の問題の解決について詳しくは、Airflow 2.0+ へのアップグレード ガイドのバックポート プロバイダに関するセクションをご覧ください。

ステップ 3: 構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得する

Console

Cloud Composer 1 環境の、構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得します。

  1. Google Cloud Console の [環境] ページに移動します。

    [環境] に移動

  2. Cloud Composer 1 環境を選択します。

  3. [環境変数] タブで、環境変数を表示します。

  4. [Airflow 構成のオーバーライド] タブで、構成のオーバーライドを表示します。

  5. [PyPI パッケージ] タブで、カスタム PyPI パッケージを表示します。

gcloud

環境変数のリストを取得するには、次のコマンドを実行します。

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.envVariables)"

環境の Airflow 構成のオーバーライドのリストを取得するには、次のコマンドを実行します。

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.airflowConfigOverrides)"

カスタム PyPI パッケージのリストを取得するには、次のコマンドを実行します。

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.pypiPackages)"

次のように置き換えます。

  • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
  • COMPOSER_1_LOCATION は、Cloud Composer 1 環境が配置されているリージョン。

Terraform

このステップをスキップします。Cloud Composer 1 環境の構成には、環境の構成のオーバーライド、カスタム PyPI パッケージ、環境変数がすでに記載されています。

ステップ 4: Cloud Composer 2 環境を作成する

このステップでは、Cloud Composer 2 環境を作成します。想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。

Console

Cloud Composer 2 環境を作成し、構成のオーバーライドと環境変数を指定します。

それ以外の方法では、環境を作成した後に Airflow の構成と環境変数オーバーライドできます。

Airflow 1 の一部の構成オプションでは、Airflow 2 とは別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。

gcloud

Cloud Composer 2 環境を作成し、構成のオーバーライドと環境変数を指定します。

それ以外の方法では、環境を作成した後に Airflow の構成と環境変数オーバーライドできます。

Airflow 1 の一部の構成オプションでは、Airflow 2 とは別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。

Terraform

Cloud Composer 2 環境は、Cloud Composer 1 環境の構成に基づいて作成します。

  1. Cloud Composer 1 環境の構成をコピーします。
  2. 環境の名前を変更します。
  3. google-beta プロバイダを使用します。

    resource "google_composer_environment" "example_environment_composer_2" {
      provider = google-beta
      # ...
    }
    
  4. config.software_config ブロックに、Cloud Composer 2 イメージを指定します。

    software_config {
      image_version = "composer-2.9.11-airflow-2.9.3"
      # ...
    }
    
  5. まだ行っていない場合は、構成のオーバーライドと環境変数を指定します

  6. config.software_config.pypi_packages ブロックに、カスタム PyPI パッケージを指定します。

    software_config {
    
      # ...
    
      pypi_packages = {
        numpy = ""
        scipy = ">=1.1.0"
      }
    
    }
    

ステップ 5: PyPI パッケージを Cloud Composer 2 環境にインストールする

Cloud Composer 2 環境を作成したら、その環境にカスタム PyPI パッケージをインストールします。

Console

  1. Google Cloud Console の [環境] ページに移動します。

    [環境] に移動

  2. Cloud Composer 2 環境を選択します。

  3. [PyPI パッケージ] タブに移動し、[編集] をクリックします。

  4. Cloud Composer 1 環境から PyPI パッケージに必要なものをコピーします。[保存] をクリックして、環境が更新されるまで待ちます。

gcloud

  1. カスタム PyPI パッケージのリストを含む requirements.txt ファイルを作成します。

      numpy
      scipy>=1.1.0
    
  2. 環境を更新します。また、requirements.txt ファイルは、--update-pypi-packages-from-file コマンドに渡します。

    gcloud composer environments update COMPOSER_2_ENV \
      --location COMPOSER_2_LOCATION  \
      --update-pypi-packages-from-file requirements.txt
    

    次のように置き換えます。

    • COMPOSER_2_ENV は、Cloud Composer 2 環境の名前。
    • COMPOSER_2_LOCATION は、Cloud Composer 2 環境が配置されているリージョン。

Terraform

このステップをスキップします。カスタム PyPI パッケージは、環境の作成時にすでにインストールしています。

ステップ 6: 変数とプールを移行する

Airflow では、変数とプールを JSON ファイルにエクスポートできます。そうすると、これらのファイルを Cloud Composer 2 環境にインポートできます。

このステップで使用する Airflow CLI コマンドは、Airflow ワーカーのローカル ファイルに対して動作します。ファイルをアップロードまたはダウンロードするには、使用している環境の Cloud Storage バケットの /data フォルダを使用します。このフォルダは Airflow ワーカーの /home/airflow/gcs/data/ ディレクトリに同期されます。Airflow CLI コマンドで、FILEPATH パラメータに /home/airflow/gcs/data/ を指定します。

gcloud

  1. Cloud Composer 1 環境の変数をエクスポートします。

    gcloud composer environments run \
        COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
    • COMPOSER_1_LOCATION は、Cloud Composer 1 環境が配置されているリージョン。
  2. Cloud Composer 1 環境のプールをエクスポートします。

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
    • COMPOSER_1_LOCATION は、Cloud Composer 1 環境が配置されているリージョン。
  3. Cloud Composer 2 環境のバケット URI を取得します。

    1. 次のコマンドを実行します。

      gcloud composer environments describe COMPOSER_2_ENV \
          --location COMPOSER_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      次のように置き換えます。

      • COMPOSER_2_ENV は、Cloud Composer 2 環境の名前。
      • COMPOSER_2_LOCATION は、環境が配置されているリージョン。
    2. 出力で、/dags フォルダを削除します。結果は、Cloud Composer 2 環境のバケットの URI です。

      たとえば、gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucket に変更します。

  4. 変数とプールを含む JSON ファイルを、Cloud Composer 2 環境に移行します。

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=pools.json
    

    次のように置き換えます。

    • COMPOSER_2_BUCKET は、前の手順で取得した Cloud Composer 2 環境バケットの URI。
    • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
    • COMPOSER_1_LOCATION は、Cloud Composer 1 環境が配置されているリージョン。
  5. 変数とプールを Cloud Composer 2 にインポートします。

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 変数とプールがインポートされたことを確認します。

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools list
    
  7. バケットから JSON ファイルを削除します。

    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    

ステップ 7: Cloud Composer 1 環境のバケットから他のデータを移行する

Cloud Composer 1 環境のバケットからプラグインや他のデータを移行します。

gcloud

  1. プラグインを Cloud Composer 2 環境に移行します。これを行うには、Cloud Composer 1 環境のバケットから Cloud Composer 2 環境のバケットにある /plugins フォルダに、プラグインをエクスポートします。

    gcloud composer environments storage plugins export \
        --destination=COMPOSER_2_BUCKET/plugins \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  2. /plugins フォルダが正常にインポートされたことを確認します。

    gcloud composer environments storage plugins list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
  3. /data フォルダを、Cloud Composer 1 環境から Airflow 2 環境にエクスポートします。

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  4. /data フォルダが正常にインポートされたことを確認します。

    gcloud composer environments storage data list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    

ステップ 8: 接続を移行する

Airflow 1.10.15 では、接続のエクスポートがサポートされていません。接続を移行するには、Cloud Composer 1 環境の接続を、Cloud Composer 2 環境に手動で作成します。

gcloud

  1. Cloud Composer 1 環境の接続のリストを取得するために、次のコマンドを実行します。

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         connections -- --list
    
  2. Cloud Composer 2 環境で新しい接続を作成するために、gcloud を使用して connections Airflow CLI コマンドを実行します。次に例を示します。

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    

ステップ 9: ユーザー アカウントを移行する

このステップでは、手動で接続を作成してユーザーを移行する方法について説明します。

Airflow 1.10.15 では、ユーザーのエクスポートがサポートされていません。ユーザーと接続を移行するには、Cloud Composer 1 環境のユーザー アカウントを Airflow 2 環境に新しく手動で作成します。

Airflow UI

  1. Cloud Composer 1 環境でユーザーの一覧を表示するには、次のようにします。

    1. Cloud Composer 1 環境の Airflow ウェブ インターフェースを開きます

    2. [管理] > [ユーザー] に移動します。

  2. Cloud Composer 2 環境でユーザーを作成するには、次のようにします。

    1. Cloud Composer 2 環境の Airflow ウェブ インターフェースを開きます

    2. [セキュリティ] > [ユーザーを一覧表示] に移動します。

    3. [新しいレコードの追加] をクリックします。

gcloud

  1. Airflow 1 では、gcloud を使用してユーザーのリストを表示することはできません。Airflow UI を使用してください。

  2. Cloud Composer 2 環境で新しいユーザー アカウントを作成するには、gcloudusers create Airflow CLI コマンドを実行します。次に例を示します。

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Op
    

    次のように置き換えます。

    • COMPOSER_2_ENV は、Cloud Composer 2 環境の名前。
    • COMPOSER_2_LOCATION は、Cloud Composer 2 環境が配置されているリージョン。
    • ユーザーのロールを含む、Cloud Composer 1 環境のすべてのユーザー構成パラメータとその値。

ステップ 10: DAG が Airflow 2 に対応する準備ができていることを確認する

DAG を Cloud Composer 1 環境に移行する前に、次のことを確認してください。

  1. DAG が正常に実行され、互換性の問題が残っていない。

  2. DAG で正しいインポート ステートメントを使用している。

    たとえば、BigQueryCreateDataTransferOperator の新しいインポート ステートメントは次のようになります。

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG が Airflow 2 用にアップグレードされます。この変更は、Airflow 1.10.14 以降のバージョンと互換性があります。

ステップ 11: DAG を Cloud Composer 2 環境に移行する

DAG を環境間で転送すると、次の問題が発生する場合があります。

  • 両方の環境で DAG が有効である(一時停止されていない)場合、各環境はスケジュールされたとおりに DAG の独自のコピーを実行します。これにより、同じデータと実行時間の重複した DAG 実行が発生する可能性があります。

  • DAG のキャッチアップのために、Airflow は DAG で指定された開始日に始まる追加の DAG 実行をスケジュールします。これは、新しい Airflow インスタンスで Cloud Composer 1 環境からの DAG 実行の履歴が考慮されないためです。このため、指定した開始日に始まるように大量の DAG 実行がスケジュールされる可能性があります。

DAG の重複実行を防止する

Cloud Composer 2 環境の Airflow 2 環境で、dags_are_paused_at_creation オプションに対する Airflow 構成オプションのオーバーライドを追加します。この変更を行うと、デフォルトで新しいすべての DAG が一時停止されます。

セクション キー
core dags_are_paused_at_creation True

DAG 実行の追加や欠落を防ぐ

実行日のギャップや重複を回避するには、Cloud Composer 2 でのキャッチアップを無効にします。このようにして、DAG を Cloud Composer 2 環境にアップロードすると、Airflow では Cloud Composer 1 環境ですでに実行されている DAG 実行がスケジュールされません。catchup_by_default オプションに対する Airflow 構成オプションのオーバーライドを追加します。

セクション キー
scheduler catchup_by_default False

DAG を Cloud Composer 2 環境に移行する

DAG を Cloud Composer 2 環境に移行するには、次のようにします。

  1. Cloud Composer 1 環境から Cloud Composer 2 環境に DAG をアップロードします。airflow_monitoring.py DAG をスキップします。

  2. 構成のオーバーライドにより、DAG は Cloud Composer 2 環境で一時停止され、DAG 実行はスケジュール設定されません。

  3. Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。

  4. DAG の転送を計画する時刻:

    1. Cloud Composer 1 環境で、DAG を一時停止します。

    2. Cloud Composer 2 環境で、DAG の一時停止を解除します。

    3. 新しい DAG 実行が正しい時間にスケジュール設定されていることを確認します。

    4. Cloud Composer 2 環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Cloud Composer 1 環境で一時停止を解除しないでください。解除すると、DAG が Cloud Composer 1 の環境で同じ日時に実行されます。

  5. 特定の DAG の実行が失敗した場合は、Cloud Composer 2 で正常に実行されるまで DAG のトラブルシューティングを行います。

    必要な場合は、次のようにいつでも DAG の Cloud Composer 1 バージョンにフォールバックして、Cloud Composer 2 で失敗した DAG 実行を Cloud Composer 1 環境で実行できます。

    1. Cloud Composer 2 環境で DAG を一時停止します。

    2. Cloud Composer 1 環境で DAG の一時停止を解除します。これにより、Cloud Composer 1 環境で、一時停止された時間の DAG 実行を補う DAG がスケジュールされます。

ステップ 12: Cloud Composer 2 環境をモニタリングする

すべての DAG と構成を Cloud Composer 2 環境に移行した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。Cloud Composer 2 環境が、十分な時間、問題なく動作している場合は、Cloud Composer 1 環境の削除を検討してください。

次のステップ