Airflow DAG のテスト

Cloud Composer 1 | Cloud Composer 2

DAG を本番環境にデプロイする前に、Airflow CLI サブコマンドを実行して、DAG を実行するコンテキストと同じコンテキストで DAG コードを解析できます。

Composer ローカル開発 CLI ツールを使用して DAG をローカルでテストする

Composer ローカル開発 CLI ツールは、Airflow 環境をローカルで実行することにより、Cloud Composer 2 向けの Apache Airflow DAG 開発を合理化します。このローカルの Airflow 環境では、Cloud Composer 2 の特定のバージョンのイメージが使用されます。

このローカル Airflow 環境を使用して DAG を開発してテストし、DAG をテスト用 Cloud Composer 環境に移行できます。このガイドの残りの部分では、テスト用 Cloud Composer 環境で DAG をテストします。

DAG 作成中にテストする

タスク インスタンスを個別にローカルで実行してログ出力を表示できます。 出力を表示できるので、構文エラーとタスクエラーを確認できます。 ローカルでのテストでは、データベースとの依存関係や通信状況はチェックされません。

DAG は、テスト環境の data/test フォルダに配置することをおすすめします。

テスト ディレクトリを作成する

環境のバケット内にテスト ディレクトリを作成し、そのディレクトリに DAG をコピーします。

gsutil cp -r BUCKET_NAME/dags \
  BUCKET_NAME/data/test

以下を置き換えます。

  • BUCKET_NAME: Cloud Composer 環境に関連付けられたバケットの名前。

例:

gsutil cp -r gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test

DAG のアップロードの詳細については、DAG を追加および更新するをご覧ください。

構文エラーを確認する

/data/test フォルダにアップロードした DAG の構文エラーを確認するには、次の gcloud コマンドを入力します。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • ENVIRONMENT_LOCATION: 環境が配置されているリージョン。

タスクエラーを確認する

/data/test フォルダにアップロードした DAG のタスク固有のエラーを確認するには、次の gcloud コマンドを実行します。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • ENVIRONMENT_LOCATION: 環境が配置されているリージョン。
  • DAG_ID: DAG の ID。
  • TASK_ID: タスクの ID。
  • DAG_EXECUTION_DATE: DAG の実行日。この日付はテンプレートとして使用されます。ここで指定した日付に関係なく、DAG はすぐに実行されます。

例:

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

デプロイされた DAG の更新とテスト

テスト環境で DAG の更新をテストするには、次の手順を行います。

  1. 更新する対象のデプロイ済み DAG を data/test にコピーします。
  2. DAG を更新します。
  3. DAG をテストします。
    1. 構文エラーを確認します
    2. タスク固有のエラーを確認します
  4. DAG が正常に実行されることを確認してください。
  5. テスト環境で DAG をオフにします。
    1. [Airflow UI] > [DAG] ページに移動します。
    2. 変更中の DAG が常に実行されている場合は、DAG をオフにします。
    3. 未処理のタスクをすばやく実行するには、タスクをクリックしてから、Mark Success をクリックします。
  6. DAG を本番環境にデプロイします。
    1. 本番環境で DAG をオフにします。
    2. 本番環境の dags/ フォルダに、更新された DAG をアップロードします

DAG のテストに関するよくある質問

本番環境とテスト環境で DAG 実行を分離するにはどうすればよいですか?

たとえば、DAG のすべての実行で Airflow が共通して使用する dags/ フォルダに、ソースコードのグローバル リポジトリがあるとします。DAG の実行を妨げずに、本番環境またはテスト環境のソースコードを更新する必要があります。

Airflow では強力な DAG 分離は提供されていません。したがって、テスト環境 DAG が本番環境 DAG と干渉しないように、本番用とテスト用の Cloud Composer 環境を別々に保持することをおすすめします。

異なる GitHub ブランチから統合テストを実行するときに DAG の干渉を回避するにはどうすればよいですか?

干渉を防ぐには一意のタスク名を使用します。たとえば、タスク ID の前にブランチ名を付けます。

Airflow との統合テストのベスト プラクティスは何ですか?

Airflow との統合テストには専用の環境を使用することをおすすめします。DAG の実行成功を保証するために、Cloud Storage フォルダ内のファイルに書き込みを行った後、独自の統合テストケースに沿ってその内容を確認することができます。

他の DAG コントリビューターと効率的なコラボレーションをするにはどうすればよいですか?

開発のために、コントリビューターごとに data/ フォルダにサブディレクトリを用意できます。

data/ フォルダに追加された DAG は、Airflow スケジューラまたはウェブサーバーによって自動的に取得されません

手動の DAG 実行を作成するには、gcloud composer environments run コマンドと test サブコマンドを使用します。その際、コントリビューターの開発ディレクトリは --subdir フラグで指定します。

例:

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

デプロイ環境と本番環境を同期するにはどうすればよいですか?

アクセスを管理するには、次のようにします。

開発環境から本番環境にデプロイするには次のようにします。

  • 環境変数や PyPI パッケージなどの構成の整合性を確保します。

  • DAG 引数の整合性を確保します。ハードコードを避けるために、Airflow のマクロと変数を使用することをおすすめします。

    例:

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

次のステップ