コンテンツに移動
データ分析

Cloud Composer で Airflow DAG 解析時間を短縮す

2023年8月24日
Google Cloud Japan Team

※この投稿は米国時間 2023 年 8 月 11 日に、Google Cloud blog に投稿されたものの抄訳です。

このブログ記事では、DAG 解析時間をモニタリング、トラブルシューティング、最小化するためのさまざまな方法を広くご紹介します。こうした方法は、以下の点でCloud Composer / Airflow における顕著なパフォーマンスの向上につながります。

  • より大きなワークロードを効率的に処理し、より多くの DAG に対応することで、環境のスケーラビリティを向上させる。

  • タスクの重複とリソース競合の可能性を低めることで、環境の安定性を高める。

  • フィードバック ループの高速化と処理時間の短縮により、デベロッパーの生産性と全体的な効率を向上させる。

DAG 解析時間の短縮は、健全な Cloud Composer / Airflow 環境の信頼できる指標になります。

使ってみる

Airflow DAG とは

Airflow DAG(有向非巡回グラフ)とはタスクの集まりで、タスク同士の関係や依存関係を反映するように編成されます。DAG は Python スクリプトで定義される、Airflow の中心概念です。

DAG は以下の 4 つのものを定義します。

  1. 実行する必要のあるタスク

  2. タスクの実行順序

  3. タスク間の依存関係

  4. タスクの実行スケジュール

DAG は複雑なワークフローを定義および管理する優れた方法です。DAG を使用すれば、タスクの自動化、タスクのスケジューリング、タスクの実行のモニタリングが可能になります。

Airflow スケジューラとは

Airflow スケジューラは、すべてのタスクと DAG をモニタリングして、依存するタスクが完了するとタスク インスタンスをトリガーします。スケジューラは、デフォルトで 30 秒に一度、DAG 解析結果を収集し、トリガー可能なアクティブなタスクの有無を確認します。

DAG プロセッサとは

Airflow 2.3.0 以降では、DAG プロセッサは Airflow スケジューラから分離されました。この変更の詳細については、AIP-43 DAG プロセッサの分離をご覧ください。

モニタリングとアラート

DAG 解析時間のモニタリング

Google Cloud コンソールでは、モニタリング ページとログタブを使用して DAG 解析時間を調べることができます。

Cloud Composer 環境の場合

Cloud Composer 環境で DAG 解析時間を確認するには、以下のコマンドを実行します。

読み込んでいます...

ローカルで time コマンドを使用する場合

読み込んでいます...

キャッシュ保存の影響を判別できるよう、複数回連続して実行するようにしてください。最適化の影響を評価するためには、最適化前後(使用するマシンや環境などの条件は同じにします)の結果を比較してください。

出力例:

読み込んでいます...

重要な指標は「real time」です。これで、DAG の処理にかかった時間がわかります。この方法でファイルを読み込む場合、新しいインタープリタを起動しているため、Airflow が DAG を解析するときには存在しない最初の読み込み時間があるので注意してください。以下を実行することで、初期化の時間を評価できます。

読み込んでいます...

結果:

読み込んでいます...

この場合、最初のインタープリタの起動時間は最長約 0.07 秒です。これは、上記の example_python_operator.py を解析するのに必要な時間の約 10% であるため、この例の DAG の場合、実際の解析時間は最長約 0.62 秒です。

理想的な解析時間の指標とは

モニタリング ダッシュボードの DAG の統計情報 セクションで、合計 DAG 解析時間のグラフを確認します。数値が約 10 秒を超える場合、スケジューラが DAG 解析により過負荷となり、DAG を効果的に実行できない可能性があります。

長い解析時間のアラートを受け取る方法

アラート ポリシーを作成して指標の値をモニタリングすると、条件に違反した場合に通知できます。これは Composer モニタリング ダッシュボードで行うことも可能です。

DAG コードの最適化

DAG コード改善の汎用的な手順

Airflow DAG の改良による Cloud Composer の最適化に、Apache Airflow DAG を作成する際に行う作業の汎用的なチェックリストが記載されています。リストの項目は、Google Cloud とオープンソース コミュニティが策定したベスト プラクティスに沿ったものとなっています。一連の高パフォーマンスの DAG によって Cloud Composer の動作が最適化され、標準化された作成方法によって、デベロッパーが数百個、あるいは数千個の DAG でも管理できるようになります。チェック項目のそれぞれが、Cloud Composer 環境と開発プロセスにメリットをもたらします。特に優先順位が高いのは、トップレベルのコードを制限することと、トップレベルのコードで変数 / XCom の使用を避けることの 2 つです。

トップレベルのコードの制限

確立されたベスト プラクティスに従ってください。演算子を作成し、演算子間の DAG リレーションを構築するのに必要のないトップレベルのコードを記述することは避けるべきです。これは、Airflow スケジューラの設計上の決定によるものです。また、トップレベルのコードの解析速度が Airflow のパフォーマンスとスケーラビリティの両方に影響するためです。

Python 開発者が見落としがちなことですが、DAG 読み込み時間に影響する重要な要因の一つに、トップレベルのインポートに驚くほど長い時間(数秒程度)がかかり、多くのオーバーヘッドを生成する可能性がある、ということがあります。これは、たとえば Python の呼び出し可能オブジェクト内部でローカル インポートに変換することで簡単に回避できます。

トップレベルのコードで変数と XCom の使用を避ける

トップレベルのコードで Variable.get() を使用している場合、.py ファイルが解析されるたびに、Airflow は DB へのセッションを開く Variable.get() を実行します。これは解析時間を著しく遅くする可能性があります。

どうしても必要な場合は、JSON の辞書または Jinja テンプレートを値として使用してください。(辞書内の多数の値に対して 1 つの接続)

DAG フォルダのクリーンアップ

DAG フォルダから未使用の DAG、不要なファイルを削除する

Airflow スケジューラは、使用されていない DAG フォルダ内のファイルを解析するために時間とリソースを無駄に消費します。

.airflowignore を使用する

 .airflowignore ファイルは、Airflow が意図的に無視する DAG_FOLDER または PLUGINS_FOLDER 内のディレクトリやファイルを指定します。Airflow は、DAG_IGNORE_FILE_SYNTAX 構成パラメータ(Airflow 2.3 で追加)により指定されるファイル内パターンの 2 種類のシンタックスをサポートしています。正規表現と glob です。

無視されるファイルが多いほど、Airflow スケジューラが解析するファイルが少なくなります。

一時停止した DAG を確認する

一時停止した DAG は、Airflow スケジューラによって引き続き解析されます。  各 DAG が一時停止した理由を特定し、削除、無視、一時停止解除のいずれが必要かを判断してください。

Airflow の構成

min_file_process_interval

スケジューラは、min_file_process_interval の秒数ごとに DAG ファイルを解析します。Airflow は、この間隔が終了した後にのみ、更新された DAG コードの使用を開始します。

変更の頻度が低い DAG が多数存在する場合や、スケジューラの負荷全般が高い場合は、この間隔を長くすることを検討してください。DAG の解析速度を上げるには、この間隔を短くすることを検討してください。DAG への更新はこの間隔の後に反映されます。この数値を低く保つと、CPU 使用率が増えます。

たとえば、1,000 以上の DAG ファイルがある場合、min_file_process_interval を 600 (10 分)、6,000 (100 分)またはそれ以上の値に上げてください。

dag_dir_list_interval

dag_dir_list_interval は、Airflow が DAG ディレクトリをスキャンする頻度を秒単位で決定します。この値を低くすると、新しい DAG がより速く処理されますが、CPU 使用率が増えます。

DAG ディレクトリの一覧表示の間隔を長くすると、環境のバケット内の新しい DAG の検出に関連するスケジューラの負荷が軽減されます。新しい DAG を頻繁にデプロイしない場合は、この間隔を長くすることを検討してください。新しくデプロイされた DAG ファイルへの Airflow の反応を速くしたい場合は、この間隔を短くすることを検討してください。

parsing_processes

DAG プロセッサは、DAG を解析するために複数のプロセスを並行して実行できます。parsing_processes(旧称 max_threads)により、並行して実行できるプロセスの数が決まります。この値を大きくすると、DAG が大量にある場合に DAG をシリアル化するのに役立ちます。デフォルトの設定値は 2 です。

読み込んでいます...

file_parsing_sort_mode 

複数の Airflow スケジューラを実行している場合は、以下の file_parsing_sort_mode オプションを評価してください。スケジューラは DAG ファイルを一覧表示して並べ替え、解析順序を決定します。

  • modified_time: ファイルの更新時間で並べ替えます。これは、最近更新された DAG から大規模に解析する場合に便利です。(デフォルト)

  • random_seeded_by_host: 複数のスケジューラ間でランダムに並べ替えますが、同じホスト上では同じ順序で並べ替えます。これは、各スケジューラが異なる DAG ファイルを解析できる HA モードのスケジューラで実行している場合に便利です。

  • alphabetical: ファイル名で並べ替えます。

大量(1,000 以上)の DAG ファイルがある場合、file_parsing_sort_modemodified_time に変更することで、新しいファイルの解析を優先させることができます。

Cloud Composer のアップグレード

ここまで進めてもまだ DAG 解析時間が長い場合は、Cloud Composer 環境にリソースを追加することを検討する必要があります。注: これにより Cloud Composer 環境の全体的な費用が増加します。

Airflow スケジューラの数の変更 / 増加

スケジューラの数を調整すると、スケジューラの容量と Airflow スケジューリングのレジリエンスが改善されます。注意: 特別に考慮すべき条件がない限り、Cloud Composer 環境に 4 つ以上の Airflow スケジューラを構成しないでください。

スケジューラの数を増やすと、Airflow データベースとの間のトラフィックが増加します。ほとんどの場合、使用する Airflow スケジューラの数を 2 つにすることをおすすめします。

Airflow スケジューラの CPU / メモリの増加

環境で使用される CPU 数、メモリ容量、ディスク容量を指定できます。このようにして、複数のワーカーとスケジューラによる水平スケーリングに加え、環境のパフォーマンスも向上させることができます。

まとめ

ここでご紹介した手順を進めれば、Cloud Composer / Airflow のメリットを最大限に引き出すことができます。また、環境のパフォーマンスを向上させ、よりスムーズに動作する開発エクスペリエンスを生み出すことができます。


- 戦略的クラウド エンジニア Christian Yarros

投稿先