Cloud Composer 1 | Cloud Composer 2
DAG を本番環境にデプロイする前に、Airflow CLI サブコマンドを実行して、DAG を実行するコンテキストと同じコンテキストで DAG コードを解析できます。
DAG 作成中にテストする
タスク インスタンスを個別にローカルで実行してログ出力を表示できます。 出力を表示できるので、構文エラーとタスクエラーを確認できます。 ローカルでのテストでは、データベースとの依存関係や通信状況はチェックされません。
DAG は、テスト環境の data/test
フォルダに配置することをおすすめします。
テスト ディレクトリを作成する
環境のバケット内にテスト ディレクトリを作成し、そのディレクトリに DAG をコピーします。
gsutil cp -r BUCKET_NAME/dags \
BUCKET_NAME/data/test
以下を置き換えます。
BUCKET_NAME
: Cloud Composer 環境に関連付けられたバケットの名前。
例:
gsutil cp -r gs://us-central1-example-environment-a12bc345-bucket/dags \
gs://us-central1-example-environment-a12bc345-bucket/data/test
DAG のアップロードの詳細については、DAG を追加および更新するをご覧ください。
構文エラーを確認する
/data/test
フォルダにアップロードした DAG の構文エラーを確認するには、次の gcloud
コマンドを入力します。
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
dags list -- --subdir /home/airflow/gcs/data/test
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
list_dags -- -sd /home/airflow/gcs/data/test
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。ENVIRONMENT_LOCATION
: 環境が配置されているリージョン。
タスクエラーを確認する
/data/test
フォルダにアップロードした DAG のタスク固有のエラーを確認するには、次の gcloud
コマンドを実行します。
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
tasks test -- --subdir /home/airflow/gcs/data/test \
DAG_ID TASK_ID \
DAG_EXECUTION_DATE
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location ENVIRONMENT_LOCATION \
test -- -sd /home/airflow/gcs/data/test DAG_ID \
TASK_ID DAG_EXECUTION_DATE
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。ENVIRONMENT_LOCATION
: 環境が配置されているリージョン。DAG_ID
: DAG の ID。TASK_ID
: タスクの ID。DAG_EXECUTION_DATE
: DAG の実行日。この日付はテンプレートとして使用されます。ここで指定した日付に関係なく、DAG はすぐに実行されます。
例:
Airflow 2
gcloud composer environments run \
example-environment \
--location us-central1 \
tasks test -- --subdir /home/airflow/gcs/data/test \
hello_world print_date 2021-04-22
Airflow 1
gcloud composer environments run example-environment \
--location us-central1 \
test -- -sd /home/airflow/gcs/data/test \
hello_world print_date 2021-04-22
デプロイされた DAG の更新とテスト
テスト環境で DAG の更新をテストするには、次の手順を行います。
- 更新する対象のデプロイ済み DAG を
data/test
にコピーします。 - DAG を更新します。
- DAG をテストします。
- DAG が正常に実行されることを確認してください。
- テスト環境で DAG をオフにします。
- [Airflow UI] > [DAG] ページに移動します。
- 変更中の DAG が常に実行されている場合は、DAG をオフにします。
- 未処理のタスクをすばやく実行するには、タスクをクリックしてから、Mark Success をクリックします。
- DAG を本番環境にデプロイします。
- 本番環境で DAG をオフにします。
- 本番環境の
dags/
フォルダに、更新された DAG をアップロードします。
DAG のテストに関するよくある質問
本番環境とテスト環境で DAG 実行を分離するにはどうすればよいですか?
たとえば、DAG のすべての実行で Airflow が共通して使用する dags/
フォルダに、ソースコードのグローバル リポジトリがあるとします。DAG の実行を妨げずに、本番環境またはテスト環境のソースコードを更新する必要があります。
Airflow では強力な DAG 分離は提供されていません。したがって、テスト環境 DAG が本番環境 DAG と干渉しないように、本番用とテスト用の Cloud Composer 環境を別々に保持することをおすすめします。
異なる GitHub ブランチから統合テストを実行するときに DAG の干渉を回避するにはどうすればよいですか?
干渉を防ぐには一意のタスク名を使用します。たとえば、タスク ID の前にブランチ名を付けます。
Airflow との統合テストのベスト プラクティスは何ですか?
Airflow との統合テストには専用の環境を使用することをおすすめします。DAG の実行成功を保証するために、Cloud Storage フォルダ内のファイルに書き込みを行った後、独自の統合テストケースに沿ってその内容を確認することができます。
他の DAG コントリビューターと効率的なコラボレーションをするにはどうすればよいですか?
開発のために、コントリビューターごとに data/
フォルダにサブディレクトリを用意できます。
data/
フォルダに追加された DAG は、Airflow スケジューラまたはウェブサーバーによって自動的に取得されません
手動の DAG 実行を作成するには、gcloud composer environments run
コマンドと test
サブコマンドを使用します。その際、コントリビューターの開発ディレクトリは --subdir
フラグで指定します。
例:
Airflow 2
gcloud composer environments run test-environment-name \
tasks test -- dag-id task-id execution-date \
--subdir /home/airflow/gcs/data/alice_dev
Airflow 1
gcloud composer environments run test-environment-name \
test -- dag-id task-id execution-date \
--subdir /home/airflow/gcs/data/alice_dev
デプロイ環境と本番環境を同期するにはどうすればよいですか?
アクセスを管理するには、次のようにします。
認証にはサービス アカウントを使用します。
アクセス制御には、Identity and Access Management と Cloud Composer のロールと権限を使用します。
開発環境から本番環境にデプロイするには次のようにします。
環境変数や PyPI パッケージなどの構成の整合性を確保します。
DAG 引数の整合性を確保します。ハードコードを避けるために、Airflow のマクロと変数を使用することをおすすめします。
例:
Airflow 2
gcloud composer environments run test-environment-name \ variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
Airflow 1
gcloud composer environments run test-environment-name \ variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE