タスクのスケジューリングの問題をデバッグする

Cloud Composer 1 | Cloud Composer 2

このチュートリアルでは、スケジューラの誤動作、エラーとレイテンシの解析、タスクの失敗につながるタスクのスケジューリングと解析に関する問題を診断し、トラブルシューティングする方法について説明します。

はじめに

Airflow スケジューラは、主にタスクのスケジューリングと DAG の解析の 2 つの要因の影響を受けます。これらの要因のいずれかの問題は、環境の健全性とパフォーマンスに悪影響を与える可能性があります。

同時にスケジュール設定されるタスクが多すぎる場合もあります。このような状況では、キューがいっぱいになり、タスクが「スケジュールされた」状態のままになるか、キューに登録された後に再スケジュールされるため、タスクの失敗とパフォーマンスのレイテンシが発生する可能性があります。

もう 1 つの一般的な問題は、DAG コードの複雑さが原因で発生するレイテンシとエラーの解析です。たとえば、コードの最上位に Airflow 変数を含む DAG コードがあると、解析の遅延、データベースの過負荷、スケジューリングの失敗、DAG のタイムアウトが発生する可能性があります。

このチュートリアルでは、サンプル DAG を診断し、スケジューリングと解析の問題のトラブルシューティング、DAG スケジューリングの改善方法、パフォーマンス向上のために DAG コードと環境構成を最適化する方法を学びます。

目標

このセクションでは、このチュートリアルの例の目的を説明します。

例: タスクの同時実行が高いことによるスケジューラの誤動作とレイテンシ

  • 同時に複数回実行されるサンプル DAG をアップロードし、Cloud Monitoring でスケジューラの誤動作やレイテンシの問題を診断します。

  • タスクを統合して DAG コードを最適化し、パフォーマンスへの影響を評価します。

  • 時間の経過とともに均等にタスクを分散し、パフォーマンスへの影響を評価します。

  • Airflow の構成と環境の構成を最適化して、影響を評価します。

例: 複雑なコードによる DAG 解析エラーとレイテンシ

  • Airflow 変数を使用してサンプル DAG をアップロードし、Cloud Monitoring で問題を診断します。

  • コードのトップレベルで Airflow 変数を回避して DAG コードを最適化し、解析時間への影響を評価します。

  • Airflow の構成と環境の構成を最適化し、解析時間への影響を評価します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

準備

このセクションでは、チュートリアルを開始する前に必要なアクションについて説明します。

プロジェクトを作成して構成する

このチュートリアルでは、Google Cloud プロジェクトが必要です。プロジェクトは、次のように構成します:

  1. Google Cloud コンソールで、プロジェクトを選択または作成します:

    プロジェクト セレクタに移動

  2. プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. Google Cloud プロジェクト ユーザーが、必要なリソースを作成するために次のロールを持っていることを確認します。

    • 環境とストレージ オブジェクトの管理者 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute 管理者roles/compute.admin

プロジェクトでAPI を有効にする

Cloud Composer API を有効にします。

API を有効にする

Cloud Composer 環境を作成する

Cloud Composer 2 環境を作成する.

環境の作成の一環としてCloud Composer v2 API サービス エージェント拡張機能roles/composer.ServiceAgentV2Ext)のロールを Composer サービス エージェントに付与します。Cloud Composer では、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。

例: タスクのスケジューリングの問題でスケジューラが誤動作し、タスクが失敗する

この例では、タスクの同時実行性が高いことによるスケジューラの誤動作とレイテンシのデバッグを示します。

サンプル DAG をお使いの環境にアップロードする

前の手順で作成した環境に、次のサンプル DAG をアップロードします。このチュートリアルでは、この DAG の名前は dag_10_tasks_200_seconds_1 です。

この DAG には 200 個のタスクがあります。各タスクは 1 秒間待機して、「Complete!」を出力します。 アップロード後、DAG が自動的にトリガーされます。Cloud Composer はこの DAG を 10 回実行し、すべての DAG 実行が並行して行われます。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

スケジューラの誤動作とタスクの失敗の問題を診断する

DAG の実行が完了したら、Airflow UI を開いて dag_10_tasks_200_seconds_1 DAG をクリックします。合計 10 個の DAG 実行が成功し、それぞれに成功したタスクが 200 個あることがわかります。

Airflow タスクのログを確認します。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動してから、[すべてのログ] > [Airflow ログ] > [ワーカー] > [ログ エクスプローラで表示] に移動します。

ログのヒストグラムで、赤とオレンジ色で示されるエラーと警告を確認できます。

赤とオレンジ色で示されるエラーと警告を含む Airflow ワーカーログのヒストグラム
図 1.Airflow ワーカーログのヒストグラム(クリックして拡大)

サンプル DAG では、約 130 件の警告と 60 件のエラーが発生しています。黄色と赤色のバーを含む列をクリックします。ログに、次の警告とエラーが表示されます。

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

これらのログは、リソース使用量が上限を超えたためにワーカーが再起動したことを示す可能性があります。

Airflow タスクがキューに長時間保持されると、スケジューラは failed および up_for_retry とマークし、実行のために再スケジュールします。この状況の症状を監視する 1 つの方法は、キューに入れられたタスクの数をグラフで確認し、このグラフのスパイクが約 10 分以内に低下しない場合は、タスクが失敗した可能性があります(ログなし)。

モニタリング情報を確認します。

  1. [モニタリング] タブに移動して、[概要] を選択します。

  2. [Airflow タスク] のグラフを確認します。

    キューに入れられたタスク数の急増を示す、経時的な Airflow タスクのグラフ
    図 2.Airflow タスクのグラフ(クリックして拡大)

    Airflow タスクのグラフで、10 分以上続くキューに入れられたタスクの急増が発生しています。これは、スケジュールされているすべてのタスクを処理するのに十分なリソースが環境に含まれていないことを意味する可能性があります。

  3. アクティブ ワーカーのグラフを確認します。

    アクティブなワーカーの数が最大上限まで増加したことを示す、経時的なアクティブな Airflow ワーカーのグラフ
    図 3.アクティブ ワーカーのグラフ(クリックして拡大)

    [アクティブ ワーカー] グラフは、DAG の実行中に DAG が 3 つのワーカーの上限まで自動スケーリングをトリガーしたことを示します。

  4. リソース使用量のグラフは、キューに入れられたタスクを実行するための容量が Airflow ワーカーにない可能性があります。[モニタリング] タブで、[ワーカー] を選択し、[ワーカーの合計 CPU] と [ワーカーの合計メモリ使用量] のグラフを確認します。

    CPU 使用率が最大上限まで増加したことを示す、Airflow ワーカーによる CPU 使用率のグラフ
    図 4.ワーカーの合計 CPU 使用率のグラフ(クリックして拡大)
    メモリ使用量は増加しているものの最大上限に達していないことを示す、Airflow ワーカーによるメモリ使用量のグラフ
    図 5.ワーカーの合計メモリ使用量のグラフ(クリックして拡大)

    グラフでは、過剰な数のタスクを同時に実行した結果、CPU の上限に達したことが示されています。リソースが 30 分以上使用されています。これは、10 回の DAG 実行で 1 つずつ実行される 200 回のタスクの合計時間よりも長くなっています。

これらは、キューがいっぱいになっていることと、スケジュールされたすべてのタスクを処理するためのリソースが不足していることを示すものです。

タスクを統合する

現在のコードでは、すべてのタスクを並行して処理するのに十分なリソースがない DAG とタスクが数多く作成されるため、キューがいっぱいになります。 タスクをキューに長時間保持すると、タスクのスケジュール変更や失敗が発生する可能性があります。 このような状況では、より少数の統合されたタスクを選ぶ必要があります。

次のサンプル DAG では、最初の例のタスク数を 200 から 20 に変更し、待機時間を 1 秒から 10 秒に増やして、同じ量の処理を行うより統合されたタスクをシミュレートします。

作成した環境に、次のサンプル DAG をアップロードします。このチュートリアルでは、この DAG の名前は dag_10_tasks_20_seconds_10 です。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

より統合されたタスクがスケジューリング プロセスに与える影響を評価します。

  1. DAG の実行が完了するまで待ちます。

  2. Airflow UI の [DAG] ページで、dag_10_tasks_20_seconds_10 DAG をクリックします。10 個の DAG 実行が表示され、それぞれに成功したタスクが 20 個あります。

  3. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  4. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  5. [ログ] タブに移動してから、[すべてのログ] > [Airflow ログ] > [ワーカー] > [ログ エクスプローラで表示] に移動します。

    より統合されたタスクを使用した 2 番目の例では、警告が約 10 件、エラーが 7 件でした。ヒストグラムで、最初の例(前の値)と 2 番目の例(後の値)でのエラーと警告の数を比較できます。

    タスクが統合された後にエラーと警告の数が減少したことを示す、エラーと警告を含む Airflow ワーカーログのヒストグラム
    図 6.タスクが統合された後の Airflow ワーカーログのヒストグラム(クリックして拡大)

    最初の例をより統合された例と比較すると、2 番目の例でエラーと警告が大幅に減少していることがわかります。ただし、リソースの過負荷により、ウォーム シャットダウンに関連する同じエラーがログに引き続き表示されます。

  6. [モニタリング] タブで、[ワーカー] を選択してグラフを確認します。

    最初の例の Airflow タスクのグラフ(前の値)と、より統合されたタスクによる 2 番目の例のグラフを比較すると、キューに入れられたタスクのスパイクは、タスクが統合された場合により短い時間になっていることがわかります。しかし、スパイクは 10 分近く、これは最適ではありません。

    Airflow タスクのスパイクが以前より短くなったことを示す、経時的な Airflow タスクのグラフ。
    図 7.タスクが統合された後の Airflow タスクのグラフ(クリックして拡大)

    アクティブ ワーカーのグラフでは、両方の例が同じ量の作業をシミュレートしているにも関わらず、最初の例(グラフの左側)が 2 番目の例よりもはるかに長い時間リソースを使用したことがわかります。

    アクティブなワーカーの数が以前より短時間で増加したことを示す、経時的なアクティブな Airflow ワーカーのグラフ。
    図 8.タスクが統合された後のアクティブなワーカーのグラフ(クリックして拡大)

    ワーカー リソース消費量のグラフを確認します。より統合されたタスクを使用した例と最初の例で使用したリソースの間に大きな差があるにも関わらず、CPU 使用率は上限の 70% まで急上昇しています。

    CPU 使用率が上限の 70% まで増加していることを示す、Airflow ワーカーによる CPU 使用率のグラフ
    図 9.タスクが統合された後のワーカーの合計 CPU 使用率のグラフ(クリックして拡大)
    メモリ使用量は増加しているものの最大上限に達していないことを示す、Airflow ワーカーによるメモリ使用量のグラフ
    図 10.タスクが統合された後のワーカーの合計メモリ使用量のグラフ(クリックして拡大)

時間の経過とともにタスクを均等に分散する

同時実行タスクが多すぎると、キューがいっぱいになり、タスクがキュー内で停止するか、再スケジュールされます。前のステップでは、これらのタスクを統合してタスクの数を減らしましたが、出力ログとモニタリングでは、同時タスクの数が最適ではないことが示されました。

スケジュールを実装するか、同時に実行できるタスク数の上限を設定することで、同時タスクの数を制御できます。

このチュートリアルでは、DAG レベルのパラメータを dag_10_tasks_20_seconds_10 DAG に追加して、時間の経過とともにタスクをより均等に分散します。

  1. DAG コンテキスト マネージャーに max_active_runs=1 引数を追加します。この引数により、特定の時点で DAG のインスタンスを 1 つだけ実行する制限を設定します。

  2. DAG コンテキスト マネージャーに max_active_tasks=5 引数を追加します。この引数により、各 DAG で同時に実行できるタスク インスタンスの最大数が制御されます。

作成した環境に、次のサンプル DAG をアップロードします。このチュートリアルでは、この DAG の名前は dag_10_tasks_20_seconds_10_scheduled.py です。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

時間の経過とともにタスクの分散がスケジューリング プロセスに与える影響を評価します。

  1. DAG の実行が完了するまで待ちます。

  2. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  3. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  4. [ログ] タブに移動してから、[すべてのログ] > [Airflow ログ] > [ワーカー] > [ログ エクスプローラで表示] に移動します。

  5. ヒストグラムで、アクティブなタスクの数と実行数が限られた 3 番目の DAG では、警告やエラーが生成されず、ログの分布が以前の値と比較してより均等に見えることがわかります。

    タスクが統合され時間の経過とともに分散された後に、エラーと警告が表示されない、エラーと警告を含む Airflow ワーカーログのヒストグラム。
    図 11.タスクが統合され、時間の経過とともに分散された後の Airflow ワーカー ログのヒストグラム(クリックして拡大)

アクティブなタスクの数と実行数が限られた dag_10_tasks_20_seconds_10_scheduled の例でのタスクでは、タスクが均等にキューに入れられるため、リソース不足は発生しませんでした。

前述の手順を実施した後、小規模なタスクを統合して、時間の経過とともにより均等に分散させることで、リソース使用量を最適化しました。

環境の構成を最適化する

環境の構成を調整して、キューに入れられたタスクを実行するための Airflow ワーカーの容量が常に確保されるようにできます。

ワーカーの数とワーカーの同時実行

ワーカーの最大数を調整して、設定した制限内で Cloud Composer が環境を自動的にスケーリングするようにできます。

[celery]worker_concurrency パラメータは、1 つのワーカーがタスクキューから受け取ることができるタスクの最大数を定義します。このパラメータを変更すると、1 つのワーカーが同時に実行できるタスクの数を調整できます。 この Airflow 構成オプションは、オーバーライドすることで変更できます。デフォルトでは、ワーカーの同時実行は最小値の 32, 12 * worker_CPU, 8 * worker_memory に設定されています。つまり、ワーカー リソースの上限に依存します。デフォルトのワーカー同時実行値の詳細については、環境の最適化をご覧ください。

ワーカーの数とワーカーの同時実行は互いに組み合わせられ、環境のパフォーマンスは両方のパラメータに大きく依存します。 次の組み合わせを使用して正しい組み合わせを選択できます。

  • 並列で実行される複数のクイックタスク。キューで待機しているタスクがあり、ワーカーの CPU とメモリの使用率が低い場合、ワーカーの同時実行数を増やすことができます。ただし、特定の状況ではキューがいっぱいにならないため、自動スケーリングがトリガーされません。新しいワーカーの準備が整うまでに小規模なタスクの実行が終了した場合、既存のワーカーが残りのタスクを受け取り、新しく作成されたワーカーにタスクがなくなります。

    そのような状況では、過大なスケーリングを避けるために、ワーカーの最小数を増やし、ワーカーの同時実行を増やすことをおすすめします。

  • 並列で実行される複数の長いタスク。ワーカーの同時実行性が高いと、システムがワーカー数のスケーリングを行えなくなります。複数のタスクがリソースを大量に消費し、完了に時間がかかる場合、ワーカーの同時実行が高いと、キューがいっぱいにならず、1 つのワーカーのみがすべてのタスクを受け取ることになり、パフォーマンスの問題が発生します。このような状況では、ワーカーの最大数を増やし、ワーカーの同時実行を減らすことをおすすめします。

並列処理の重要性

Airflow スケジューラは、DAG 実行のスケジューリングと DAG の個々のタスクを制御します。[core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラがエグゼキュータのキュー内にキューに入れるタスクの数を制御します。

並列処理は Airflow の保護メカニズムです。これにより、ワーカー数に関係なく、各スケジューラで同時に実行できるタスクの数が決まります。 並列処理の値にクラスタ内のスケジューラ数を掛けた値が、環境内のキューに入れられるタスク インスタンスの最大数になります。

通常、[core]parallelism は、ワーカーの最大数と [celery]worker_concurrency の積として設定されます。また、プールの影響を受けます。 この Airflow 構成オプションは、オーバーライドすることで変更できます。スケーリングに関連する Airflow 構成の調整の詳細については、Airflow 構成のスケーリングをご覧ください。

最適な環境構成を見つける

スケジューリングの問題を修正するために推奨される方法は、小さなタスクをより大きなタスクに統合し、時間の経過とともに均等にタスクを分散することです。DAG コードの最適化に加えて、複数のタスクを同時に実行するのに十分な容量を持つように環境構成を最適化することもできます。

たとえば、DAG のタスクをできる限り統合するものの、時間の経過とともに均等にタスクを分散するようアクティブなタスクを制限することは、特定のユースケースの望ましいソリューションでない場合を考えます。

並列処理、ワーカー数、ワーカー同時実行のパラメータを調整して、アクティブなタスクを制限することなく dag_10_tasks_20_seconds_10 DAG を実行できます。この例では、DAG が 10 回実行され、実行ごとに 20 個の小さなタスクが含まれています。 すべてを同時に実行する場合は、次のようにします。

  • 環境のマネージド Cloud Composer インフラストラクチャのパフォーマンス パラメータを制御するため、より大きな環境サイズが必要になります。

  • Airflow ワーカーは、20 個のタスクを同時に実行できる必要があります。つまり、ワーカーの同時実行を 20 に設定する必要があります。

  • ワーカーには、すべてのタスクを処理するのに十分な CPU とメモリが必要です。ワーカーの同時実行はワーカーの CPU とメモリの影響を受けます。したがって、CPU に少なくとも worker_concurrency / 12 とメモリに least worker_concurrency / 8 が必要です。

  • ワーカーの同時実行に合わせて、並列処理を増やす必要があります。 ワーカーがキューから 20 個のタスクを受け取るためには、スケジューラは最初にその 20 個のタスクをスケジュールする必要があります。

次の方法で環境構成を調整します。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブに移動します。

  4. [リソース] > [ワークロード] の構成を見つけて、[編集] をクリックします。

  5. [ワーカー] セクションの [メモリ] フィールドに、Airflow ワーカーの新しいメモリ上限を指定します。このチュートリアルでは 4 GB を使用します。

  6. [CPU] フィールドで、Airflow ワーカーの新しい CPU 上限を指定します。このチュートリアルでは、2 つの vCPU を使用します。

  7. 変更を保存し、Airflow ワーカーが再起動するまで数分かかります。

次に、Airflow の並列処理とワーカー同時実行の構成オプションをオーバーライドします。

  1. [Airflow 構成のオーバーライド] タブに移動します。

  2. [編集] をクリックしてから、[Airflow 構成のオーバーライドを追加] をクリックします。

  3. 並列処理の構成をオーバーライドします。

    セクション キー
    core parallelism 20
  4. [Airflow 構成のオーバーライドを追加] をクリックして、ワーカーの同時実行の構成をオーバーライドします。

    セクション キー
    celery worker_concurrency 20
  5. [保存] をクリックし、環境の構成が更新されるまで待ちます。

調整済みの構成を使用して、同じ例の DAG を再度トリガーします。

  1. Airflow ツールバーで、[DAG] ページに移動します。

  2. dag_10_tasks_20_seconds_10 DAG を見つけて削除します。

    DAG が削除されると、Airflow は環境のバケット内の DAG フォルダをチェックし、自動的に DAG を再度実行します。

DAG の実行が完了したら、ログのヒストグラムをもう一度確認します。この図から、調整された環境の構成で実行したときに、より統合されたタスクを使用した dag_10_tasks_20_seconds_10 の例では、エラーと警告が生成されなかったことがわかります。結果を図の以前のデータと比較します。ここでは、デフォルトの環境構成で実行したときにエラーと警告が生成されています。

環境の構成が調整された後に、エラーと警告が表示されない、エラーと警告を含む Airflow ワーカーログのヒストグラム
図 12.環境構成が調整された後の Airflow ワーカーログのヒストグラム(クリックして拡大)

環境の構成と Airflow の構成はタスクのスケジュール設定において重要な役割を果たしますが、特定の上限を超えて構成を増やすことはできません。

DAG コードを最適化し、タスクを統合して、パフォーマンスと効率を最適化するためにスケジューリングを使用することをおすすめします。

例: 複雑な DAG コードによる DAG 解析エラーとレイテンシ

この例では、過剰な Airflow 変数をシミュレートするサンプル DAG の解析レイテンシを調査します。

新しい Airflow 変数を作成する

サンプルコードをアップロードする前に、新しい Airflow 変数を作成します。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. [Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。

  3. [管理者] > [変数] > [新しいレコードを追加] に移動します。

  4. 次の値を設定します。

    • key: example_var
    • val: test_airflow_variable

サンプル DAG をお使いの環境にアップロードする

前の手順で作成した環境に、次のサンプル DAG をアップロードします。このチュートリアルでは、この DAG の名前は dag_for_loop_airflow_variable です。

この DAG には、1,000 回実行され、過剰な Airflow 変数をシミュレートする for ループが含まれています。イテレーションごとに example_var 変数が読み取られ、タスクが生成されます。各タスクには、変数の値を出力するコマンドが 1 つ含まれています。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

解析の問題を診断する

DAG 解析時間は、Airflow スケジューラが DAG ファイルを読み取り、解析するのにかかる時間です。Airflow スケジューラは、DAG からタスクのスケジュールを設定する前に、DAG ファイルを解析して DAG の構造と定義済みのタスクを検出する必要があります。

DAG の解析に長い時間がかかると、スケジューラの容量が消費され、DAG 実行のパフォーマンスが低下する可能性があります。

DAG の解析時間をモニタリングするには:

  1. gcloud CLI で dags report Airflow CLI コマンドを実行して、すべての DAG の解析時間を確認します。

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    次のように置き換えます。

    • ENVIRONMENT_NAME: 環境の名前。
    • LOCATION: 環境が配置されているリージョン。
  2. コマンドの出力で、dag_for_loop_airflow_variables DAG の期間値を探します。値が大きい場合、この DAG が最適な方法で実装されていない可能性があります。複数の DAG がある場合は、出力テーブルから、解析時間が長い DAG を確認できます。

    例:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Google Cloud コンソールで DAG 解析時間を検査します。

    1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  4. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  5. [ログ] タブに移動してから、[すべてのログ] > [DAG プロセッサ マネージャー] に移動します。

  6. dag-processor-manager ログを確認し、問題かあるかどうか確認します。

    DAG 解析時間が 46.3 秒であることを示す、サンプル DAG のログエントリ
    図 13.DAG 解析時間を表示する DAG プロセッサ マネージャーのログ(クリックして拡大)

DAG の解析時間の合計が約 10 秒を超えると、スケジューラが DAG の解析で過負荷状態になり、DAG を効果的に実行できなくなる可能性があります。

DAG コードを最適化する

DAG で不要な「トップレベル」の Python コードを避けることをおすすめします。DAG 外部からのインポート、変数、関数が多い DAG では、Airflow スケジューラの解析時間が長くなります。これにより、Cloud Composer と Airflow のパフォーマンスとスケーラビリティが低下します。Airflow 変数の読み取りが多すぎると、解析時間が長くなり、データベースの負荷が高くなります。DAG ファイルに上記のコードが含まれている場合、すべてのスケジューラのハートビートでこれらの関数が実行され、実行が遅くなる場合があります。

Airflow のテンプレート フィールドを使用して、Airflow 変数の値と Jinja テンプレートに含まれる値を DAG に取り込むことができます。これにより、スケジューラのハートビート中の不要な関数実行が防止されます。

DAG の例をより適切な方法で実装するには、DAG の最上位 Python コード内で Airflow 変数を使用しないでください。代わりに、Jinja テンプレートを使用して既存のオペレーターに Airflow 変数を渡します。これにより、タスクの実行まで値の読み取りが遅延します。

環境にサンプルの DAG の新しいバージョンをアップロードします。このチュートリアルでは、この DAG の名前は dag_for_loop_airflow_variable_optimized です。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

新しい DAG の解析時間を調べます。

  1. DAG の実行が完了するまで待ちます。

  2. もう一度 dags report コマンドを実行して、すべての DAG の解析時間を確認します。

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. 再度 dag-processor-manager ログを確認し、解析時間を分析します。

    DAG 解析時間が 4.21 秒であることを示す、サンプル DAG のログエントリ
    図 14.DAG コードが最適化された後の DAG 解析時間を表示する DAG プロセッサ マネージャーのログ(クリックして拡大)

環境変数を Airflow テンプレートに置き換えることで、DAG コードを簡素化し、解析のレイテンシを約 10 分の 1 に短縮できます。

Airflow 環境の構成を最適化する

Airflow スケジューラは、常に新しいタスクをトリガーし、環境バケット内のすべての DAG を解析します。DAG の解析時間が長く、スケジューラが大量のリソースを消費している場合は、スケジューラがリソースをより効率的に使用できるように、Airflow スケジューラ構成を最適化できます。

このチュートリアルでは、DAG ファイルの解析に時間がかかり、解析サイクルが重複し始め、スケジューラの容量を使い果たします。この例では、最初の DAG の例では解析に 5 秒以上かかるため、リソースをより効率的に使用するためにスケジューラの実行頻度を少なく構成します。scheduler_heartbeat_sec Airflow 構成オプションをオーバーライドします。この構成では、スケジューラの実行頻度(秒単位)を定義します。デフォルトでは、値は 5 秒に設定されています。 この Airflow 構成オプションは、オーバーライドすることで変更できます。

scheduler_heartbeat_sec Airflow 構成オプションをオーバーライドします。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [Airflow 構成のオーバーライド] タブに移動します。

  4. [編集] をクリックしてから、[Airflow 構成のオーバーライドを追加] をクリックします。

  5. Airflow 構成オプションをオーバーライドします。

    セクション キー
    scheduler scheduler_heartbeat_sec 10
  6. [保存] をクリックし、環境の構成が更新されるまで待ちます。

スケジューラの指標を確認します。

  1. [モニタリング] タブに移動し、[スケジューラ] を選択します。

  2. [スケジューラ ハートビート] のグラフで、[その他のオプション] ボタン(3 つのドット)をクリックしてから、[Metrics Explorer で表示] をクリックします。

ハートビートの発生頻度が低いことを示すスケジューラのハートビート グラフ
図 15.スケジューラのハートビート グラフ(クリックして拡大)

グラフでは、デフォルト構成を 5 秒から 10 秒に変更した後、スケジューラの実行頻度が 2 倍少ないことがわかります。ハートビートの頻度を減らすことで、前の解析サイクルの進行中にスケジューラの実行が開始されず、スケジューラのリソース容量が使い果たされないようにすることができます。

スケジューラにさらにリソースを割り当てる

Cloud Composer 2 では、スケジューラにより多くの CPU とメモリリソースを割り当てることができます。このようにして、スケジューラのパフォーマンスを向上させ、DAG の解析時間を加速できます。

スケジューラに追加の CPU とメモリを割り当てます。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブに移動します。

  4. [リソース] > [ワークロード] の構成を見つけて、[編集] をクリックします。

  5. [スケジューラ] セクションの [メモリ] フィールドに、新しいメモリ上限を指定します。このチュートリアルでは 4 GB を使用します。

  6. [CPU] フィールドに新しい CPU 上限を指定します。このチュートリアルでは、2 つの vCPU を使用します。

  7. 変更を保存し、Airflow スケジューラが再起動するまで数分かかります。

  8. [ログ] タブに移動してから、[すべてのログ] > [DAG プロセッサ マネージャー] に移動します。

  9. dag-processor-manager ログを確認して、サンプル DAG の解析時間を比較します。

    最適化された DAG の DAG 解析時間が 1.5 秒であることを示すサンプル DAG のログエントリ。最適化されていない DAG の場合、解析時間は 28.71 秒
    図 16.スケジューラにより多くのリソースが割り当てられた後の DAG 解析時間を示す、DAG プロセッサ マネージャーのログ(クリックして拡大)

スケジューラにより多くのリソースを割り当てることで、スケジューラの容量が増え、デフォルトの環境構成と比較して解析のレイテンシが大幅に短縮されました。スケジューラがより多くの DAG をより速く解析できるようになりますが、Cloud Composer リソースに関連する費用も増加します。また、特定の上限を超えてリソースを増やすことはできません。

使用可能な DAG コードと Airflow 構成の最適化が実装された後にのみリソースを割り当てることをおすすめします。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

リソースを個別に削除する

複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。

Cloud Composer 環境を削除します。この手順中に環境のバケットも削除します。

次のステップ