スナップショットを使用して環境を Cloud Composer 2 に移行する(Airflow 1 から)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、DAG と既存の Cloud Composer 1 のデータと構成の Airflow 1 環境を、Cloud Composer 2(Airflow 2)に移行する方法について説明します。

この移行ガイドでは、スナップショット機能を使用します。

その他の移行ガイド

開始日 終了日 メソッド ガイド
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 移行ガイド(スナップショット)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、スナップショットを使用 このガイド(スナップショット)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 並列処理、手動転送 手動移行ガイド
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並列処理、手動転送 手動移行ガイド
Airflow 1 Airflow 2 並列処理、手動転送 手動移行ガイド

準備

  • スナップショットは、Cloud Composer 2 バージョン 2.0.9 以降でサポートされています。Cloud Composer 1 では、1.18.5 で環境スナップショットの保存がサポートされています。

  • Cloud Composer では、Cloud Composer 1 から Cloud Composer 2 への移し換え移行がサポートされています。Cloud Composer 1 から Cloud Composer 2 への置き換えアップグレードはできません。

  • Cloud Composer 1 と Cloud Composer 2 の相違点の一覧を確認してください。

  • スナップショットをサポートする Airflow データベースの最大サイズは 20 GB です。環境のデータベースが 20 GB を超える場合は、Airflow データベースのサイズを縮小します。

  • スナップショットを作成するには、環境のバケット内の /dags/plugins/data フォルダ内のオブジェクトの合計数が 100,000 未満にする必要があります。

  • XCom メカニズムを使用してファイルを転送する場合は、Airflow のガイドラインに従って使用するようにしてください。XCom を使用して大きなファイルや大量のファイルを転送すると、Airflow データベースのパフォーマンスに影響し、スナップショットの読み込みや環境のアップグレード時に障害が発生する可能性があります。大量のデータを転送するには、Cloud Storage などの代替手段の使用を検討してください。

  • Cloud Composer 2 では Airflow 2 が使用されるため、移行には DAG と環境構成の Airflow 2 への切り替えが含まれます。Cloud Composer における Airflow 1 から Airflow 2 への互換性を失う変更については、Airflow 1 から Airflow 2 への移行ガイドをご覧ください。

  • このガイドは、Airflow 2 への移行と Cloud Composer 2 への移行を組み合わせて 1 つの移行手順にしています。これにより、Cloud Composer 2 に移行する前に、Airflow 2 がある Cloud Composer 1 環境に移行する必要はありません。

ステップ 1: Airflow 1.10.15 にアップグレードする

環境で 1.10.15 より前の Airflow バージョンを使用している場合は、Airflow 1.10.15 を使用し、スナップショットをサポートする Cloud Composer のバージョン環境をアップグレードします。

ステップ 2: Airflow 2 との互換性を確認する

Airflow 2 との競合の可能性を確認するには、Airflow 2.0+ へのアップグレード ガイドの DAG のアップグレードに関するセクションをご覧ください。

発生する可能性のある一般的な問題の 1 つは、互換性のないインポート パスに関するものです。この互換性の問題の解決について詳しくは、Airflow 2.0+ へのアップグレード ガイドのバックポート プロバイダに関するセクションをご覧ください。

ステップ 3: DAG が Airflow 2 に対応する準備ができていることを確認する

DAG を Cloud Composer 2 環境に移行する前に、次のことを確認してください。

  1. DAG が正常に実行され、互換性の問題が残っていない。

  2. DAG で正しいインポート ステートメントを使用している。

    たとえば、BigQueryCreateDataTransferOperator の新しいインポート ステートメントは次のようになります。

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG が Airflow 2 用にアップグレードされます。この変更は、Airflow 1.10.14 以降のバージョンと互換性があります。

ステップ 4: Cloud Composer 1 環境で DAG を一時停止する

DAG の重複実行を回避するには、スナップショットを保存する前に Cloud Composer 1 環境内のすべての DAG を一時停止します。

次のいずれかのオプションを使用できます。

  • Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG を手動で一時停止します。

  • composer_dags スクリプトを使用して、すべての DAG を一時停止します。

    python3 composer_dags.py --environment COMPOSER_1_ENV \
      --project PROJECT_ID \
      --location COMPOSER_1_LOCATION \
      --operation pause
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
    • COMPOSER_1_LOCATION は、環境が配置されているリージョン。

ステップ 5: Cloud Composer 1 環境のスナップショットを保存する

コンソール

環境のスナップショットを作成します。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の Cloud Composer 1 環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [スナップショットを作成] をクリックします。

  4. [スナップショットを作成] ダイアログで、[送信] をクリックします。このガイドでは、スナップショットを Cloud Composer 1 環境のバケットに保存しますが、必要に応じて別のロケーションを選択することもできます。

  5. Cloud Composer がスナップショットを作成するまで待ちます。

gcloud

  1. Cloud Composer 1 環境のバケット URI を取得します。

    1. 次のコマンドを実行します。

      gcloud composer environments describe COMPOSER_1_ENV \
          --location COMPOSER_1_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      次のように置き換えます。

      • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前に置き換えます。
      • COMPOSER_1_LOCATION は、環境が配置されているリージョン。
    2. 出力で、/dags フォルダを削除します。結果は、Cloud Composer 1 環境のバケットの URI です。

      たとえば、gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucket に変更します。

  2. Cloud Composer 1 環境のスナップショットを作成します。

    gcloud composer environments snapshots save \
      COMPOSER_1_ENV \
      --location COMPOSER_1_LOCATION \
      --snapshot-location "COMPOSER_1_SNAPSHOTS_FOLDER"
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Cloud Composer 1 環境の名前。
    • COMPOSER_1_LOCATION は、Cloud Composer 1 環境が配置されているリージョン。
    • COMPOSER_1_SNAPSHOTS_FOLDER は、Cloud Composer 1 環境のバケットの URI。このガイドでは、スナップショットを Cloud Composer 1 環境のバケットに保存しますが、必要に応じて別の場所を選択することもできます。カスタム ロケーションを指定する場合は、両方の環境のサービス アカウントに、指定したロケーションに対する読み取りと書き込みの権限が必要です。

ステップ 6: Cloud Composer 2 環境を作成する

Cloud Composer 2 環境を作成します。想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。

構成のオーバーライドと環境変数は、後で Cloud Composer 1 環境のスナップショットを読み込むときに置き換えるため、指定する必要はありません。

Airflow 1 の一部の構成オプションでは、Airflow 2 とは別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。

ステップ 7: スナップショットを Cloud Composer 2 環境に読み込む

コンソール

スナップショットを Cloud Composer 2 環境に読み込むには:

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の Cloud Composer 2 環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [スナップショットを読み込む] をクリックします。

  4. [スナップショットを読み込む] ダイアログで、[参照] をクリックします。

  5. スナップショットのあるフォルダを選択します。このガイドのデフォルトの場所を使用する場合、このフォルダは Cloud Composer 1 環境バケットの /snapshots フォルダにあり、名前はスナップショット保存オペレーションのタイムスタンプです。例: us-central1-example-916807e1-bucket/snapshots_example-project_us-central1_example-environment/2022-01-05T18-59-00

  6. [読み込む] をクリックし、Cloud Composer によってスナップショットが読み込まれるまで待ちます。

gcloud

Cloud Composer 1 環境のスナップショットを Cloud Composer 2 環境に読み込みます。

gcloud composer environments snapshots load \
  COMPOSER_2_ENV \
  --location COMPOSER_2_LOCATION \
  --snapshot-path "SNAPSHOT_PATH"

次のように置き換えます。

  • COMPOSER_2_ENV は、Cloud Composer 2 環境の名前。
  • COMPOSER_2_LOCATION は、Cloud Composer 2 環境が配置されているリージョン。
  • SNAPSHOT_PATH を、Cloud Composer 1 環境のバケットの URI の後にスナップショットのパスを付加したものに置き換えます。例: gs://us-central1-example-916807e1-bucket/snapshots/example-project_us-central1_example-environment_2022-01-05T18-59-00

ステップ 8: Cloud Composer 2 環境で DAG の一時停止を解除する

次のいずれかのオプションを使用できます。

  • Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG の一時停止を手動で 1 つずつ解除します。

  • composer_dags スクリプトを使用して、すべての DAG の一時停止を解除します。

      python3 composer_dags.py --environment COMPOSER_2_ENV \
      --project PROJECT_ID \
      --location COMPOSER_2_LOCATION \
      --operation unpause
    

    次のように置き換えます。

    • COMPOSER_2_ENV は、Cloud Composer 2 環境の名前。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
    • COMPOSER_2_LOCATION は、環境が配置されているリージョン。

ステップ 9: DAG エラーを確認する

  1. Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。

  2. DAG 実行が正しい時間にスケジュール設定されていることを確認します。

  3. Cloud Composer 2 環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Cloud Composer 1 環境で一時停止を解除しないでください。解除すると、DAG が Cloud Composer 1 の環境で同じ日時に実行されます。

  4. 特定の DAG の実行が失敗した場合は、Cloud Composer 2 で正常に実行されるまで DAG のトラブルシューティングを行います。

ステップ 10: Cloud Composer 2 環境をモニタリングする

すべての DAG と構成を Cloud Composer 2 環境に移行した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。

Cloud Composer 2 環境が、十分な時間、問題なく動作している場合は、Cloud Composer 1 環境の削除を検討してください。

次のステップ