Cloud Composer 1 | Cloud Composer 2
このページでは、DAG と既存の Cloud Composer 1 のデータと構成の Airflow 1 環境を、Cloud Composer 2(Airflow 2)に移行する方法について説明します。
その他の移行ガイド
移行元 | 終了日 | メソッド | ガイド |
---|---|---|---|
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 並列処理、スナップショットを使用 | 移行ガイド(スナップショット) |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並列処理、スナップショットを使用 | 移行ガイド(スナップショット) |
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並列処理、手動転送 | このガイド(手動移行) |
Airflow 1 | Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
始める前に
- Cloud Composer では、Cloud Composer 1 から Cloud Composer 2 への移し換え移行がサポートされています。Cloud Composer 1 から Cloud Composer 2 への置き換えアップグレードはできません。
- Cloud Composer 1 と Cloud Composer 2 の相違点の一覧を確認してください。
Cloud Composer 2 では Airflow 2 が使用されるため、移行には DAG と環境構成の Airflow 2 への切り替えが含まれます。Cloud Composer における Airflow 1 から Airflow 2 への互換性を失う変更については、Airflow 1 から Airflow 2 への移行ガイドをご覧ください。
このガイドは、Airflow 2 への移行と Cloud Composer 2 への移行を組み合わせて 1 つの移行手順にしています。これにより、Cloud Composer 2 に移行する前に、Airflow 2 がある Cloud Composer 1 環境に移行する必要はありません。
ステップ 1: Airflow 1.10.15 にアップグレードする
環境で 1.10.15 より前の Airflow バージョンを使用している場合は、Airflow 1.10.15 を使用する Cloud Composer のバージョンに環境をアップグレードします。
ステップ 2: Airflow 2 との互換性を確認する
Airflow 2 との競合の可能性を確認するには、既存の Airflow 1.10.15 環境で Airflow が提供するアップグレード チェック スクリプトを使用します。
gcloud
gcloud composer environments run
コマンドを使用してアップグレード チェックを実行します。スタンドアロンの Airflow 1.10.15 に関連する一部のアップグレード チェックは Cloud Composer とは関係ありません。次のコマンドは、これらのチェックを除外します。gcloud composer environments run \ COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ upgrade_check \ -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \ --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
次のように置き換えます。
COMPOSER_1_ENV
は、Airflow 1.10.15 環境の名前。COMPOSER_1_LOCATION
は、環境が配置されているリージョン。
コマンドの出力を確認します。更新チェック スクリプトにより、既存の環境での潜在的な互換性の問題が報告されます。
Airflow 2.0+ のアップグレード ガイドの DAG のアップグレードについてのセクションで説明されているように、DAG に対するその他の変更を実装します。
ステップ 3: 構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得する
Console
Cloud Composer 1 環境の、構成のオーバーライド、カスタム PyPI パッケージ、環境変数のリストを取得します。
Google Cloud Console の [環境] ページに移動します。
Cloud Composer 1 環境を選択します。
[環境変数] タブで、環境変数を表示します。
[Airflow 構成のオーバーライド] タブで、構成のオーバーライドを表示します。
[PyPI パッケージ] タブで、カスタム PyPI パッケージを表示します。
gcloud
環境変数のリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.envVariables)"
環境の Airflow 構成のオーバーライドのリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.airflowConfigOverrides)"
カスタム PyPI パッケージのリストを取得するには、次のコマンドを実行します。
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.pypiPackages)"
次のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Terraform
このステップをスキップします。Cloud Composer 1 環境の構成には、環境の構成のオーバーライド、カスタム PyPI パッケージ、環境変数がすでに記載されています。
ステップ 4: Cloud Composer 2 環境を作成する
このステップでは、Cloud Composer 2 環境を作成します。想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。
Console
Cloud Composer 2 環境を作成し、構成のオーバーライドと環境変数を指定します。
それ以外の方法では、環境を作成した後に Airflow の構成と環境変数をオーバーライドできます。
Airflow 1 の一部の構成オプションでは、Airflow 2 とは別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。
gcloud
Cloud Composer 2 環境を作成し、構成のオーバーライドと環境変数を指定します。
それ以外の方法では、環境を作成した後に Airflow の構成と環境変数をオーバーライドできます。
Airflow 1 の一部の構成オプションでは、Airflow 2 とは別の名前とセクションを使用します。詳細については、構成の変更をご覧ください。
Terraform
Cloud Composer 2 環境は、Cloud Composer 1 環境の構成に基づいて作成します。
- Cloud Composer 1 環境の構成をコピーします。
- 環境の名前を変更します。
google-beta
プロバイダを使用します。resource "google_composer_environment" "example_environment_composer_2" { provider = google-beta # ... }
config.software_config
ブロックに、Cloud Composer 2 イメージを指定します。software_config { image_version = "composer-2.8.2-airflow-2.7.3" # ... }
まだ行っていない場合は、構成のオーバーライドと環境変数を指定します。
config.software_config.pypi_packages
ブロックに、カスタム PyPI パッケージを指定します。software_config { # ... pypi_packages = { numpy = "" scipy = ">=1.1.0" } }
ステップ 5: PyPI パッケージを Cloud Composer 2 環境にインストールする
Cloud Composer 2 環境を作成したら、その環境にカスタム PyPI パッケージをインストールします。
Console
Google Cloud Console の [環境] ページに移動します。
Cloud Composer 2 環境を選択します。
[PyPI パッケージ] タブに移動し、[編集] をクリックします。
Cloud Composer 1 環境から PyPI パッケージに必要なものをコピーします。[保存] をクリックして、環境が更新されるまで待ちます。
gcloud
カスタム PyPI パッケージのリストを含む
requirements.txt
ファイルを作成します。numpy scipy>=1.1.0
環境を更新します。また、
requirements.txt
ファイルは、--update-pypi-packages-from-file
コマンドに渡します。gcloud composer environments update COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --update-pypi-packages-from-file requirements.txt
次のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、Cloud Composer 2 環境が配置されているリージョン。
Terraform
このステップをスキップします。カスタム PyPI パッケージは、環境の作成時にすでにインストールしています。
ステップ 6: 変数とプールを移行する
Airflow では、変数とプールを JSON ファイルにエクスポートできます。そうすると、これらのファイルを Cloud Composer 2 環境にインポートできます。
このステップで使用する Airflow CLI コマンドは、Airflow ワーカーのローカル ファイルに対して動作します。ファイルをアップロードまたはダウンロードするには、使用している環境の Cloud Storage バケットの /data
フォルダを使用します。このフォルダは Airflow ワーカーの /home/airflow/gcs/data/
ディレクトリに同期されます。Airflow CLI コマンドで、FILEPATH
パラメータに /home/airflow/gcs/data/
を指定します。
gcloud
Cloud Composer 1 環境の変数をエクスポートします。
gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
次のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Cloud Composer 1 環境のプールをエクスポートします。
gcloud composer environments run COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
次のように置き換えます。
COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
Cloud Composer 2 環境のバケット URI を取得します。
次のコマンドを実行します。
gcloud composer environments describe COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --format="value(config.dagGcsPrefix)"
次のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、環境が配置されているリージョン。
出力で、
/dags
フォルダを削除します。結果は、Cloud Composer 2 環境のバケットの URI です。たとえば、
gs://us-central1-example-916807e1-bucket/dags
をgs://us-central1-example-916807e1-bucket
に変更します。
変数とプールを含む JSON ファイルを、Cloud Composer 2 環境に移行します。
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=variables.json
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=pools.json
次のように置き換えます。
COMPOSER_2_BUCKET
は、前の手順で取得した Cloud Composer 2 環境バケットの URI。COMPOSER_1_ENV
は、Cloud Composer 1 環境の名前。COMPOSER_1_LOCATION
は、Cloud Composer 1 環境が配置されているリージョン。
変数とプールを Cloud Composer 2 にインポートします。
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
変数とプールがインポートされたことを確認します。
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables list
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools list
バケットから JSON ファイルを削除します。
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
ステップ 7: Cloud Composer 1 環境のバケットから他のデータを移行する
Cloud Composer 1 環境のバケットからプラグインや他のデータを移行します。
gcloud
プラグインを Cloud Composer 2 環境に移行します。これを行うには、Cloud Composer 1 環境のバケットから Cloud Composer 2 環境のバケットにある
/plugins
フォルダに、プラグインをエクスポートします。gcloud composer environments storage plugins export \ --destination=COMPOSER_2_BUCKET/plugins \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
/plugins
フォルダが正常にインポートされたことを確認します。gcloud composer environments storage plugins list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
/data
フォルダを、Cloud Composer 1 環境から Airflow 2 環境にエクスポートします。gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
/data
フォルダが正常にインポートされたことを確認します。gcloud composer environments storage data list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
ステップ 8: 接続を移行する
Airflow 1.10.15 では、接続のエクスポートがサポートされていません。接続を移行するには、Cloud Composer 1 環境の接続を、Cloud Composer 2 環境に手動で作成します。
gcloud
Cloud Composer 1 環境の接続のリストを取得するために、次のコマンドを実行します。
gcloud composer environments run COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ connections -- --list
Cloud Composer 2 環境で新しい接続を作成するために、
gcloud
を使用してconnections
Airflow CLI コマンドを実行します。次に例を示します。gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
ステップ 9: ユーザー アカウントを移行する
このステップでは、手動で接続を作成してユーザーを移行する方法について説明します。
Airflow 1.10.15 では、ユーザーのエクスポートがサポートされていません。ユーザーと接続を移行するには、Cloud Composer 1 環境のユーザー アカウントを Airflow 2 環境に新しく手動で作成します。
Airflow UI
Cloud Composer 1 環境でユーザーの一覧を表示するには、次のようにします。
Cloud Composer 1 環境の Airflow ウェブ インターフェースを開きます。
[管理] > [ユーザー] に移動します。
Cloud Composer 2 環境でユーザーを作成するには、次のようにします。
Cloud Composer 2 環境の Airflow ウェブ インターフェースを開きます。
[セキュリティ] > [ユーザーを一覧表示] に移動します。
[新しいレコードの追加] をクリックします。
gcloud
-
Airflow 1 では、
gcloud
を使用してユーザーのリストを表示することはできません。Airflow UI を使用してください。 Cloud Composer 2 環境で新しいユーザー アカウントを作成するには、
gcloud
でusers create
Airflow CLI コマンドを実行します。次に例を示します。gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Op
次のように置き換えます。
COMPOSER_2_ENV
は、Cloud Composer 2 環境の名前。COMPOSER_2_LOCATION
は、Cloud Composer 2 環境が配置されているリージョン。- ユーザーのロールを含む、Cloud Composer 1 環境のすべてのユーザー構成パラメータとその値。
ステップ 10: DAG が Airflow 2 に対応する準備ができていることを確認する
DAG を Cloud Composer 1 環境に移行する前に、次のことを確認してください。
DAG に対するアップグレード チェック スクリプトが正常に実行され、互換性の問題が残っていない。
DAG で正しいインポート ステートメントを使用している。
たとえば、
BigQueryCreateDataTransferOperator
の新しいインポート ステートメントは次のようになります。from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
DAG が Airflow 2 用にアップグレードされます。この変更は、Airflow 1.10.14 以降のバージョンと互換性があります。
ステップ 11: DAG を Cloud Composer 2 環境に移行する
DAG を環境間で転送すると、次の問題が発生する場合があります。
両方の環境で DAG が有効である(一時停止されていない)場合、各環境はスケジュールされたとおりに DAG の独自のコピーを実行します。これにより、同じデータと実行時間の重複した DAG 実行が発生する可能性があります。
DAG のキャッチアップのために、Airflow は DAG で指定された開始日に始まる追加の DAG 実行をスケジュールします。これは、新しい Airflow インスタンスで Cloud Composer 1 環境からの DAG 実行の履歴が考慮されないためです。このため、指定した開始日に始まるように大量の DAG 実行がスケジュールされる可能性があります。
DAG の重複実行を防止する
Cloud Composer 2 環境の Airflow 2 環境で、dags_are_paused_at_creation
オプションに対する Airflow 構成オプションのオーバーライドを追加します。この変更を行うと、デフォルトで新しいすべての DAG が一時停止されます。
セクション | キー | 値 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
DAG 実行の追加や欠落を防ぐ
実行日のギャップや重複を回避するには、Cloud Composer 2 でのキャッチアップを無効にします。このようにして、DAG を Cloud Composer 2 環境にアップロードすると、Airflow では Cloud Composer 1 環境ですでに実行されている DAG 実行がスケジュールされません。catchup_by_default
オプションに対する Airflow 構成オプションのオーバーライドを追加します。
セクション | キー | 値 |
---|---|---|
scheduler |
catchup_by_default |
False |
DAG を Cloud Composer 2 環境に移行する
DAG を Cloud Composer 2 環境に移行するには、次のようにします。
Cloud Composer 1 環境から Cloud Composer 2 環境に DAG をアップロードします。
airflow_monitoring.py
DAG をスキップします。構成のオーバーライドにより、DAG は Cloud Composer 2 環境で一時停止され、DAG 実行はスケジュール設定されません。
Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。
DAG の転送を計画する時刻:
Cloud Composer 1 環境で、DAG を一時停止します。
Cloud Composer 2 環境で、DAG の一時停止を解除します。
新しい DAG 実行が正しい時間にスケジュール設定されていることを確認します。
Cloud Composer 2 環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Cloud Composer 1 環境で一時停止を解除しないでください。解除すると、DAG が Cloud Composer 1 の環境で同じ日時に実行されます。
特定の DAG の実行が失敗した場合は、Cloud Composer 2 で正常に実行されるまで DAG のトラブルシューティングを行います。
必要な場合は、次のようにいつでも DAG の Cloud Composer 1 バージョンにフォールバックして、Cloud Composer 2 で失敗した DAG 実行を Cloud Composer 1 環境で実行できます。
Cloud Composer 2 環境で DAG を一時停止します。
Cloud Composer 1 環境で DAG の一時停止を解除します。これにより、Cloud Composer 1 環境で、一時停止された時間の DAG 実行を補う DAG がスケジュールされます。
ステップ 12: Cloud Composer 2 環境をモニタリングする
すべての DAG と構成を Cloud Composer 2 環境に移行した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。Cloud Composer 2 環境が、十分な時間、問題なく動作している場合は、Cloud Composer 1 環境の削除を検討してください。