DAG(ワークフロー)のテスト

DAG を本番環境にデプロイする前に、Airflow CLI サブコマンドを実行して、DAG を実行するコンテキストと同じコンテキストで DAG コードを解析できます。

DAG 作成中にテストする

タスク インスタンスを個別にローカルで実行してログ出力を表示できます。 出力を表示できるので、構文エラーとタスクエラーを確認できます。 ローカルでのテストでは、データベースとの依存関係や通信状況はチェックされません。

DAG は、テスト環境の data/test フォルダに配置することをおすすめします。

PyPI パッケージ エラーを確認する

PyPI 依存関係は Airflow に必要な依存関係と競合する可能性があるため、ローカルで Airflow ワーカー コンテナに目的の Python パッケージをインストールしてパッケージをテストすることをおすすめします。

  1. Cloud Composer 環境の GKE クラスタを特定します。

  2. GKE クラスタに接続します。

  3. Airflow ワーカー Pod を表示して選択します。

    kubectl get pods --all-namespaces

    airflow-worker-1a2b3c-x0yz などの名前の付いた Pod を探します。

  4. Airflow ワーカー コンテナ内のリモートシェルに接続します。

    kubectl -n composer-1-6-0-airflow-example-namespace \
      exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash

    リモートシェルへの接続中に、コマンド プロンプトに Airflow ワーカー Pod の名前(airflow-worker-1a2b3c-x0yz: など)が表示されます。

  5. Python パッケージを Air 環境のワーカー コンテナにインストールします。たとえば、次のように Python パッケージをインストールします。

    sudo python2 -m pip install "[PACKAGE]"

  6. Airflow ワーカー コンテナでの互換性をテストします。

    • 構文エラーを確認します。
      airflow list_dags
    • テンプレートをレンダリングします。
      airflow test --dry_run [DAG_ID] [TASK_ID] [EXECUTION_DATE]
    • タスクエラーを確認します。

      airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]

  7. 以下のように、Airflow ワーカー コンテナから Python パッケージをアンインストールします。

    sudo python2 -m pip uninstall "[PACKAGE]"

構文エラーを確認する

  1. ご使用の環境の Cloud Storage バケットにテスト ディレクトリを作成します。
  2. 構文エラーを確認するには、次の gcloud コマンドを入力します。

    gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION \
     list_dags -- -sd /home/airflow/gcs/data/test

    ここで

    • ENVIRONMENT_NAME は、環境の名前です。
    • LOCATION は、環境が配置される Compute Engine のリージョンです。

    例:

    gcloud composer environments run \
     test-environment --location us-central1 \
     list_dags -- -sd /home/airflow/gcs/data/test

タスクエラーを確認する

タスク固有のエラーを確認するには、次の gcloud コマンドを入力します。

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • LOCATION は、環境が配置される Compute Engine のリージョンです。
  • DAG_ID は、DAG の ID です。
  • TASK_ID は、タスクの ID です。
  • DAG_EXECUTION_DATE は、DAG の実行日です。この日付はテンプレートとして使用されます。ここで指定した日付に関係なく、DAG はすぐに実行されます。

例:

gcloud composer environments run test-environment --location us-central1 \
        -- -sd /home/airflow/gcs/data/test-dags hello_world print_date 2018-09-03

デプロイされた DAG の更新とテスト

テスト環境で DAG の更新をテストするには、次の手順を行います。

  1. 更新する対象のデプロイ済み DAG を data/test にコピーします。
  2. DAG を更新します。
  3. DAG をテストします。
    1. 構文エラーを確認します
    2. タスク固有のエラーを確認します
  4. DAG が正常に実行されることを確認してください。
  5. テスト環境で DAG をオフにします。
    1. [Airflow UI] > [DAG] ページに移動します。
    2. 変更中の DAG が常に実行されている場合は、DAG をオフにします。
    3. 未処理のタスクをすばやく実行するには、タスクをクリックしてから、Mark Success をクリックします。
  6. DAG を本番環境にデプロイします。
    1. 本番環境で DAG をオフにします。
    2. 本番環境の dags/ フォルダに、更新された DAG をアップロードします

ワークフローのテストに関するよくある質問

次のステップ