DAG(ワークフロー)のテスト

DAG を本番環境にデプロイする前に、Airflow CLI サブコマンドを実行して、DAG を実行するコンテキストと同じコンテキストで DAG コードを解析できます。

DAG 作成中にテストする

タスク インスタンスを個別にローカルで実行してログ出力を表示できます。 出力を表示できるので、構文エラーとタスクエラーを確認できます。 ローカルでのテストでは、データベースとの依存関係や通信状況はチェックされません。

DAG は、テスト環境の data/test フォルダに配置することをおすすめします。

テスト ディレクトリの作成

  1. ご使用の環境の Cloud Storage バケットにテスト ディレクトリを作成し、DAG をそのディレクトリにコピーします。

    gsutil

    gsutil cp -r BUCKET_NAME/dags BUCKET_NAME/data/test
    

    ここで

    • BUCKET_NAME は、Cloud Composer 環境に関連付けられている Cloud Storage バケットの名前です。

    次に例を示します。

    gsutil

    gsutil cp -r gs://us-central1-test-environment-a12bc345-bucket/dags gs://us-central1-test-environment-a12bc345-bucket/data/test
    

構文エラーを確認する

  1. ご使用の環境の Cloud Storage バケットテスト ディレクトリを作成し、DAG をそのディレクトリにコピーします

  2. 構文エラーを確認するには、次の gcloud コマンドを入力します。

    Airflow 1.10 CLI

    gcloud composer environments run \
      ENVIRONMENT_NAME \
      --location ENVIRONMENT_LOCATION \
       list_dags -- -sd /home/airflow/gcs/data/test
    

    Airflow 2.0 CLI

    gcloud beta composer environments run \
      ENVIRONMENT_NAME \
      --location ENVIRONMENT_LOCATION \
       dags list -- --subdir /home/airflow/gcs/data/test
    

    ここで

    • ENVIRONMENT_NAME は、環境の名前です。
    • ENVIRONMENT_LOCATION は、環境が配置される Compute Engine のリージョンです。

    例:

    Airflow 1.10 CLI

    gcloud composer environments run \
    test-environment --location us-central1 \
    list_dags -- -sd /home/airflow/gcs/data/test
    

    Airflow 2.0 CLI

    gcloud beta composer environments run \
    test-environment --location us-central1 \
    dags list -- --subdir /home/airflow/gcs/data/test
    

タスクエラーを確認する

  1. ご使用の環境の Cloud Storage バケットテスト ディレクトリを作成し、DAG をそのディレクトリにコピーします

  2. タスク固有のエラーを確認するには、次の gcloud コマンドを入力します。

Airflow 1.10 CLI

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Airflow 2.0 CLI

gcloud beta composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • ENVIRONMENT_LOCATION は、環境が配置される Compute Engine のリージョンです。
  • DAG_ID は、DAG の ID です。
  • TASK_ID は、タスクの ID です。
  • DAG_EXECUTION_DATE は、DAG の実行日です。この日付はテンプレートとして使用されます。ここで指定した日付に関係なく、DAG はすぐに実行されます。

例:

Airflow 1.10 CLI

gcloud composer environments run test-environment \
  --location us-central1 \
  test -- -sd /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Airflow 2.0 CLI

gcloud beta composer environments run \
  test-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

デプロイされた DAG の更新とテスト

テスト環境で DAG の更新をテストするには、次の手順を行います。

  1. 更新する対象のデプロイ済み DAG を data/test にコピーします。
  2. DAG を更新します。
  3. DAG をテストします。
    1. 構文エラーを確認します
    2. タスク固有のエラーを確認します
  4. DAG が正常に実行されることを確認してください。
  5. テスト環境で DAG をオフにします。
    1. [Airflow UI] > [DAG] ページに移動します。
    2. 変更中の DAG が常に実行されている場合は、DAG をオフにします。
    3. 未処理のタスクをすばやく実行するには、タスクをクリックしてから、Mark Success をクリックします。
  6. DAG を本番環境にデプロイします。
    1. 本番環境で DAG をオフにします。
    2. 本番環境の dags/ フォルダに、更新された DAG をアップロードします

DAG のテストに関するよくある質問

本番環境とテスト環境で DAG 実行を分離するにはどうすればよいですか?

たとえば、DAG のすべての実行で Airflow が共通して使用する dags/ フォルダに、ソースコードのグローバル リポジトリがあるとします。DAG の実行を妨げずに、本番環境またはテスト環境のソースコードを更新する必要があります。

Airflow では強力な DAG 分離は提供されていません。したがって、テスト環境 DAG が本番環境 DAG と干渉しないように、本番用とテスト用の Cloud Composer 環境を別々に保持することをおすすめします。

異なる GitHub ブランチから統合テストを実行するときに DAG の干渉を回避するにはどうすればよいですか?

干渉を防ぐには一意のタスク名を使用します。たとえば、タスク ID の前にブランチ名を付けます。

Airflow との統合テストのベスト プラクティスは何ですか?

Airflow との統合テストには専用の環境を使用することをおすすめします。DAG の実行成功を保証するために、Cloud Storage フォルダ内のファイルに書き込みを行った後、独自の統合テストケースに沿ってその内容を確認することができます。

他の DAG コントリビューターと効率的なコラボレーションをするにはどうすればよいですか?

開発のために、コントリビューターごとに data/ フォルダにサブディレクトリを用意できます。

data/ フォルダに追加された DAG は、Airflow スケジューラまたはウェブサーバーによって自動的に取得されません

手動の DAG 実行を作成するには、gcloud composer environments run コマンドと test サブコマンドを使用します。その際、コントリビューターの開発ディレクトリは --subdir フラグで指定します。

次に例を示します。

gcloud composer environments run test-environment-name \
    test -- dag-id task-id execution-date \
    --subdir /home/airflow/gcs/data/alice_dev

Airflow 2.0 CLI では、tasks test コマンドを使用します。

gcloud beta composer composer environments run test-environment-name \
    tasks test -- dag-id task-id execution-date \
    --subdir /home/airflow/gcs/data/alice_dev

デプロイ環境と本番環境を同期するにはどうすればよいですか?

アクセスを管理するには、次のようにします。

開発環境から本番環境にデプロイするには次のようにします。

  • 環境変数や PyPI パッケージなどの構成の整合性を確保します。

  • DAG 引数の整合性を確保します。ハードコードを避けるために、Airflow のマクロと変数を使用することをおすすめします。

    次に例を示します。

    gcloud composer environments run test-environment-name \
        variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

    Airflow 2.0 CLI では、variables set コマンドを使用します。

    gcloud beta composer composer environments run test-environment-name \
        variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

次のステップ