Google Cloud Composer での Airflow DAG とタスクの同時実行について理解する
Christian Yarros
Strategic Cloud Engineer, Google
※この投稿は米国時間 2024 年 7 月 26 日に、Google Cloud blog に投稿されたものの抄訳です。
Apache Airflow は、データ ワークフローのオーケストレーションによく利用されているツールです。Google Cloud は Cloud Composer と呼ばれるマネージド Airflow サービスを提供しています。これは、Apache Airflow 上に構築されたフルマネージド ワークフロー オーケストレーション サービスで、パイプラインの作成、スケジュール設定、モニタリングを可能にします。
Airflow は人気があり、使いやすいものの、Airflow インストールにはさまざまなコンポーネントと多数の構成設定があるため、DAG(有向非巡回グラフ)とタスクの同時実行の細かな部分は一見わかりにくい場合があります。同時実行戦略を理解して実装すると、データ パイプラインでのリソースの使用率が最適化され、スケーラビリティが改善し、フォールト トレランスが向上します。このガイドは、Airflow の同時実行について知っておくべきことを 4 つのレベルにわたって説明することを目的としています。
-
Composer 環境
-
Airflow インストール
-
DAG
-
タスク
各セクションの図解は、Airflow タスクが意図したとおりに実行されるようにするには、どの設定を調整する必要があるかを理解するのに役立ちます。では、さっそく始めましょう。
1. Cloud Composer 2 環境レベルの設定
これはこの Google Cloud サービス全体が対象となります。これには、Airflow を実行するために必要なすべてのマネージド インフラストラクチャと、Cloud Logging や Cloud Monitoring などの他の Google Cloud サービスとのインテグレーションが含まれます。このレベルの構成は、Airflow インストール、DAG、およびタスクによって継承されます。
ワーカーの最小数と最大数
Cloud Composer 環境を構築する際には、Airflow ワーカーの最小数と最大数、およびワーカーサイズ(CPU、メモリ、ストレージ)を指定します。これらの構成により、worker_concurrency のデフォルト値が決まります。
Terraform の例:
ワーカーの同時実行
1 つの CPU を持つワーカーは通常、12 個の同時タスクを処理できます。Cloud Composer 2 では、ワーカーの同時実行のデフォルト値は次のようになります。
-
Airflow 2.3.3 以降のバージョンでは、32、12 * worker_CPU、8 * worker_memory のうちの最小値。
-
Airflow 2.3.3 より前のバージョンでは、12 * worker_CPU。
例:
小規模な Composer 環境:
-
worker_cpu = 0.5
-
worker_mem = 2
-
worker_concurrency = min(32, 12*0.5cpu, 8*2gb) = 6
中規模の Composer 環境:
-
worker_cpu = 2
-
worker_mem = 7.5
-
worker_concurrency = min(32, 12*2cpu, 8*7.5gb) = 24
大規模な Composer 環境:
-
worker_cpu = 4
-
worker_mem = 15
-
worker_concurrency = min(32, 12*4cpu, 8*15gb) = 32
ワーカーの自動スケーリング
同時実行のパフォーマンスと環境の自動スケーリング機能は、次の 2 つの設定に関連づけられています。
-
Airflow ワーカーの最小数
-
[celery]worker_concurrency パラメータ
Cloud Composer はタスクキューをモニタリングして、待機しているタスクを請け負う追加のワーカーを生成します。[celery]worker_concurrency を高い値に設定すると、すべてのワーカーが大量のタスクを請け負えるようになるため、状況によってはキューが埋まらず、自動スケーリングがトリガーされないことがあります。
たとえば、2 つの Airflow ワーカーがあり、[celery]worker_concurrency が 100 に設定され、キューにタスクが 200 個ある Cloud Composer 環境では、各ワーカーは 100 個のタスクを請け負います。この場合、キューは空のままになり、自動スケーリングがトリガーされません。これらのタスクの完了に時間がかかると、ワーカー スロットが利用可能になるまで他のタスクが待機するため、結果が出るのが遅れる可能性があります。
別の見方をすると、Composer のスケーリングは、キューに入れられたタスクと実行中のタスクの合計を調べ、この数値を [celery]worker_concurrency で割り、その結果から ceiling() を実行するという仕組みです。実行中のタスクが 11 個、キューに入れられた状態のタスクが 8 個で、[celery]worker_concurrency が 6 に設定されている場合、ワーカーの目標数は ceiling((11+8)/6) = 4 になるため、Composer はワーカーの数を 4 に増やそうとします。
2. Airflow インストール レベルの設定
これは、Cloud Composer によって管理されている Airflow インストールです。これには、スケジューラ、DAG プロセッサ、ウェブサーバー、ワーカー、メタデータ データベースなどのすべての Airflow コンポーネントが含まれます。このレベルは、まだ設定されていない場合は Composer レベルの構成を継承します。
[celery]worker_concurrency: Cloud Composer によって提供されるデフォルト値はほとんどのユースケースに最適ですが、環境によってはカスタム調整が役立つ場合があります。
core.parallelism: Airflow インストール全体で実行されるタスクの最大数。parallelism=0 は無制限を意味します。
core.max_active_runs_per_dag: DAG あたりのアクティブな DAG 実行の最大数
core.max_active_tasks_per_dag: DAG あたりのアクティブな DAG タスクの最大数
キュー
CeleryExecutor を使用する場合、タスクの送信先となる Celery キューを指定できます。キューは BaseOperator の属性であるため、任意のタスクを任意のキューに割り当てることができます。環境のデフォルト キューは、airflow.cfg の celery -> default_queue で定義されます。これで定義されるのは、指定されていない場合にタスクが割り当てられるキューと、Airflow ワーカーが開始時にリッスンするキューです。
プール
Airflow プールを使用すると、任意のタスクセットの実行の並列処理を制限できます。プールのリストは、プールに名前を付け、ワーカー スロットの数を割り当てることによって、UI([メニュー] -> [管理者] -> [プール])で管理されます。ここでは、プールが占有スロットの計算に遅延タスクを含めるかどうかも決定できます。
3. DAG レベルの設定
DAG は Airflow のコアコンセプトであり、タスクをまとめて収集し、依存関係と関係性に基づいて整理することで、タスクの実行方法を指定します。
max_active_runs: この DAG のアクティブな実行の最大数。この上限に達すると、スケジューラはアクティブな DAG 実行を新規作成しません。設定しない場合、デフォルトで core.max_active_runs_per_dag になります。
max_active_tasks: これが設定されている DAG のすべてのアクティブな実行で同時に実行できるタスク インスタンスの数。この設定が定義されていない場合は、環境レベルの設定 max_active_tasks_per_dag の値が使用されます。
4. タスクレベルの設定
Airflow タスクについて
タスク インスタンスの状態は次のいずれかになります。
-
none: タスクはまだ実行キューに入っていません(依存関係がまだ満たされていません)。
-
scheduled: タスクの依存関係が満たされており、実行する必要があるとスケジューラが判断しました。
-
queued: タスクはエグゼキュータに割り当てられており、ワーカーを待機しています。
-
running: タスクはワーカー(またはローカル / 同期エグゼキュータ)上で実行されています。
-
success: タスクはエラーなしで実行を完了しました。
-
restarting: タスクは実行中に外部から再起動を要求されました。
-
failed: タスクの実行中にエラーが発生し、実行に失敗しました。
-
skipped: タスクは、分岐、LatestOnly、または類似の理由でスキップされました。
-
upstream_failed: 上流のタスクが失敗し、トリガールールはそのタスクが必要であると示しています。
-
up_for_retry: タスクは失敗しましたが、再試行回数が残っているので、再スケジュールされます。
-
up_for_reschedule: タスクは再スケジュール モードにあるセンサーです。
-
deferred: タスクはトリガーに渡されました。
- removed: タスクは、実行が開始された後に DAG から消えました。
タスクは、none から、scheduled、queued、running と流れ、最終的に success になることが理想的です。特に指定がない限り、タスクは DAG または Airflow レベルで設定された同時実行構成を継承します。タスク固有の構成には次のものが含まれます。
pool: タスクを実行するプール。プールを使用すると、タスクのサブセットのみの並列処理を制限できます。
max_active_tis_per_dag: タスクごとに dag_runs 全体で同時に実行されるタスク インスタンスの数を制御します。
遅延可能なオペレータとトリガー
標準のオペレータとセンサーは、アイドル状態の場合でも、実行中はワーカー スロットを 1 つ完全に占有します。たとえば、タスクの実行に使用できるワーカー スロットが 100 個しかなく、現在実行中ではあるもののアイドル状態のセンサーを待機している DAG が 100 個ある場合、Airflow クラスター全体が本質的にはアイドル状態であるにもかかわらず、他のものを実行することはできません。
ここで、遅延可能なオペレータが登場します。
遅延可能なオペレータとは、待機する必要があると判断した場合にタスクを一時停止してワーカーを解放し、再開のジョブをトリガーと呼ばれるものに渡す機能を備えて記述されたオペレータです。したがって、一時停止(遅延)中の遅延可能なオペレータによってワーカー スロットが占有されることはないため、アイドル状態のオペレータやセンサーのために無駄になるクラスタ リソースが大幅に少なくなります。なお、デフォルトでは、遅延タスクはプールスロットを使うことはありません。その必要がある場合は、該当するプールを編集してこれを変更できます。
トリガーは、単一の Python プロセスですべて一緒に実行されるように設計された、小さな非同期の Python コードです。非同期であるため、すべてが効率的に共存できます。このプロセスは以下のように機能します。
-
タスク インスタンス(実行中のオペレータ)は、待機する必要があるポイントに到達すると、インスタンスの再開イベントに紐づけられたトリガーに移動します。これによりワーカーは解放され、他の作業を実行できるようになります。
-
新しいトリガー インスタンスは Airflow 内に登録され、triggerer プロセスによって取得されます。
-
トリガーは起動するまで実行され、起動した時点でソースタスクが再スケジュールされます。
-
スケジューラはタスクをワーカーノードで再開するためにキューに入れます。
センサーモード
センサーは主にアイドル状態であるため、より効率的に使用できる実行モードが 2 つあります。
poke(デフォルト): センサーはランタイム全体にわたってワーカー スロットを占有します。
reschedule: センサーはチェックしているときのみワーカー スロットを占有し、チェックの合間には一定期間スリープします。センサーの reschedule モードはこの問題の一部を解決し、センサーを固定間隔でのみ実行できるようにしますが、柔軟性に欠け、再開の理由となるのは時間のみで、他の理由は使用できません。
または、一部のセンサーでは、deferrable=True を設定できます。この場合、プロセスを別の triggerer コンポーネントにオフロードすることで、リソース使用率がさらに向上します。
センサーにおける mode=’reschedule’ と deferrable=True の違い
Airflow では、センサーは特定の条件が満たされるのを待ってから、下流のタスクに進みます。センサーには、アイドル時間を管理するためのオプションが 2 つあります。mode=’reschedule’ と deferrable=True です。mode=’reschedule’ は、Airflow の BaseSensorOperator に固有のパラメータで、条件が満たされない場合にセンサーが再スケジュールされるようにします。一方、deferrable=True は、タスクを後で再試行(または遅延)できることを示すために一部のオペレータが使用する方法ですが、Airflow に組み込まれたパラメータまたはモードではありません。タスクを再試行する実際の動作は、特定のオペレータの実装によって異なる場合があります。
同時実行制限
さまざまな構成設定がどのように相互作用して、同時 DAG 実行またはタスクの数を制限するかを以下の図に示します。
まとめ
これらは上から順に、Cloud Composer での同時実行を完全に制御できる構成です。
Composer 環境
ワーカーの最小数と最大数: ワーカーが増えると、同時に実行できるタスクも増えます。
Airflow インストール
worker_concurrency: 同時実行が多いほど、個々のワーカーが請け負うタスクが多くなります。 値が高いと、すべてのワーカーが大量のタスクを請け負えるようになるため、状況によってはキューが埋まらず、自動スケーリングがトリガーされないことがあります。ほとんどの場合、Composer のデフォルト値を使用します。
parallelism: Airflow インストール全体で実行されるタスクの最大数。parallelism=0 は無制限を意味します。
max_active_runs_per_dag: DAG あたりのアクティブな DAG 実行の最大数。
max_active_tasks_per_dag: DAG あたりのアクティブな DAG タスクの最大数。
DAG
max_active_runs: この DAG のアクティブな実行の最大数。この上限に達すると、スケジューラはアクティブな DAG 実行を新規作成しません。設定しない場合、デフォルトで core.max_active_runs_per_dag になります。
max_active_tasks: これが設定されている DAG のすべてのアクティブな実行で同時に実行できるタスク インスタンスの数。この設定が定義されていない場合は、環境レベルの設定 max_active_tasks_per_dag の値が使用されます。
タスク
max_active_tis_per_dag: タスクごとに dag_runs 全体で同時に実行されるタスク インスタンスの数を制御します。
トラブルシューティングのシナリオ
シナリオ: Composer 環境がワーカーの上限に頻繁に達し、キュー内のタスク数が一貫して多く、DAG が SLA を満たさない。
解決策: Cloud Composer 環境内のワーカーの数を増やすか、自動スケーリングの最小値と最大値を高く設定します。
シナリオ: タスク間のスケジューリングの遅延が長いにもかかわらず、環境がワーカーの最大数にスケールアップされない。
解決策: ワーカーの同時実行([celery]worker_concurrency)を増やします。ワーカーの同時実行数は、環境内のワーカーの最大数で割った、同時実行タスクの予想される最大数よりも高い値に設定する必要があります。
シナリオ: 同じ DAG を何度も並行して実行すると、Airflow によって実行がスロットリングされる。
解決策: DAG あたりの最大アクティブ実行数を増やします(max_active_runs_per_dag、max_active_runs)。
シナリオ: 単一の DAG で多数のタスクを並行して実行すると、Airflow によってタスク実行がスロットリングされる。
解決策: 単一の DAG をできるだけ早く完了したい場合は、DAG の同時実行を増やします(max_active_tasks_per_dag、max_active_tasks)。他の DAG を同時に実行したい場合は、その DAG の max_active_tasks 値または環境レベルの max_active_tasks_per_dag を減らします。また、parallelism が 0(無限大)に設定されていないかどうかを確認します。
シナリオ: 単一の DAG で同じタスクを何度も並行して実行すると、Airflow によってタスクの実行がスロットリングされる。
解決策: タスクの同時実行を増やします(max_active_tasks_per_dag、max_active_tasks、max_active_tis_per_dag)。
シナリオ: タスクが同時に実行されていない。
解決策: Airflow では、parallelism は Airflow ワーカー / Airflow スケジューラで利用可能なリソースと環境構成に依存します。タスクが正確に同時に実行されるという保証はありません。保証できるのは、タスク A、B、C がタスク D より先に完了することのみです。
シナリオ: タスクがスロットリングされている。
解決策: 上記の同時実行制限チャートを確認し、現在の構成をメモします。
シナリオ: センサーがワーカー スロットを占有しすぎている。
解決策: n 秒間隔(つまり、poke_interval の値が 60 未満)でチェックするセンサーには、mode=poke を使用します。n 分間隔(つまり、poke_interval の値が 60 以上)でチェックするセンサーには、mode=reschedule を使用します。mode=reschedule が設定されたセンサーは、次のチェックまでの n 分間、Airflow ワーカー リソースを解放します。さらにパフォーマンスを向上させるには、センサーに deferrable=True を使用することをお勧めします。これにより、センサーモードは無視され、代わりに poke_interval とプロセスが Airflow triggerer に渡され、Airflow ワーカー リソースが他のタスクに解放されます。
次のステップ
Cloud Composer の潜在能力を最大限に引き出すには、Airflow DAG とタスクの同時実行を習得することが不可欠です。コアコンセプトを理解し、環境を効果的に構成して、実用的な最適化戦略を採用することで、最も複雑なデータ パイプラインでも自信を持ってオーケストレートできます。Cloud Composer の詳細をご確認ください。
Cloud Composer、Apache Airflow、およびこのガイドで説明されている情報の詳細については、次のリソースをご覧ください。
-戦略的クラウド エンジニア Christian Yarros