コンテンツに移動
データ分析

Airflow DAG の改良による Cloud Composer の最適化

2023年2月2日
Google Cloud Japan Team

※この投稿は米国時間 2023 年 1 月 21 日に、Google Cloud blog に投稿されたものの抄訳です。

データ パイプラインのホスティング、オーケストレーション、管理は、どの企業にとっても複雑なプロセスです。Google Cloud が提供している Cloud Composer というフルマネージドのワークフロー オーケストレーション サービスを利用すれば、企業はクラウドとオンプレミス データセンターにまたがるワークフローの作成、スケジューリング、モニタリング、管理に対応できます。Cloud Composer は広く利用されている Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。Apache Airflow では、ユーザーがタスクの有向非巡回グラフ(DAG)を作成できるようになっています。この DAG は、特定の間隔で実行されるようにスケジュールすることも、外部イベントによってトリガーすることも可能です。

このガイドには、Apache Airflow DAG を作成する際に一般的に適用できる、作業項目のチェックリストが掲載されています。これらのチェック項目は、Google Cloud とオープンソース コミュニティが判断したベスト プラクティスに沿ったものとなっています。一連の高パフォーマンスの DAG によって Cloud Composer の動作が最適化され、標準化された作成方法によって、デベロッパーが数百個、あるいは数千個の DAG でも管理できるようになります。チェック項目のそれぞれが、Cloud Composer 環境と開発プロセスにメリットをもたらします。

はじめに

  1. ファイル名を標準化します。作成した DAG ファイルのコレクションを他のデベロッパーが容易に参照できるようにするためです。
    a. 例: team_project_workflow_version.py
  2. DAG は決定的でなければなりません
    a. 特定の入力によって常に同じ出力が生成される必要があります。
  3. DAG はべき等でなければなりません
    a. DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければなりません。
  4. タスクはアトミック、かつ、べき等でなければなりません
    a. タスクごとに、他のオペレーションとは独立して再実行できる 1 つのオペレーションを処理するようにします。タスクがアトミックな場合、そのタスクの一部が成功したことは、タスク全体が成功したことを意味します。
  5. 可能な限り単純な DAG にします。
    a. タスク間の依存関係が少ない単純な DAG にすると、オーバーヘッドが少なくなるため、スケジューリングのパフォーマンスが向上する傾向があります。一般的に、多数の依存関係がある深くネストされたツリー構造よりも線形構造(例: A -> B -> C)にしたほうが、効率的な DAG になります。
  6. Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください。
    a. Python の docstring 規則は、他のデベロッパーやプラットフォーム エンジニアが Airflow DAG を理解するために役立ちます。
    b. 関数と同様に BashOperator のドキュメントも作成するようにしてください。DAG で bash スクリプトを参照している場合、スクリプトの目的を記したドキュメントがないと、このスクリプトに詳しくないデベロッパーにはトラブルシューティングが困難です。

DAG の作成を標準化する

7.default_args にオーナーを追加します。
a. デベロッパーのメールアドレス / ID を使用するか、配布リスト / チーム名を使用するかを決定します。

  1. dag = DAG() ではなく with DAG() as dag を使用します。
    a. すべてのオペレーターまたはタスクグループに DAG オブジェクトを渡す必要がなくなるようにします。
  2. DAG ID 内にバージョンを設定します。
    a. DAG 内のコードを変更するたびにバージョンを更新します。
    b. こうすると、削除されたタスクログが UI に表示されなくなることや、ステータスのないタスクが古い DAG 実行に対して生成されること、DAG 変更時の一般的な混乱を防ぐことができます。
    c. Airflow オープンソースには、将来的にバージョニングが実装される予定です。

10.DAG にタグを追加します。
a. デベロッパーがタグをフィルタして Airflow UI をナビゲートできるようにします。
b. 組織、チーム、プロジェクト、アプリケーションなどを基準に DAG をグループ化します。

  1. DAG の説明を追加します。
    a. 他のデベロッパーが自分の DAG の内容を理解できるようにします。
  2. 作成時には DAG を一時停止します。
    a. こうすると、誤って DAG が実行されて Cloud Composer 環境の負荷が増すという事態を回避できます。
  3. catchup=False に設定して、自動キャッチアップによる Cloud Composer 環境の過負荷を避けます。
  4. DAG が完了せずに Cloud Composer 環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します。
  5. インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします。
  6. DAG では静的な start_date を使用します
    a. 動的な start_date を使用した場合、誤った開始日が導き出され、失敗したタスク インスタンスやスキップされた DAG 実行を消去するときにエラーが発生する可能性があります。
  7. retries を、DAG レベルで適用される default_arg として設定します。これよりも細かいレベルで設定するのは、特定のタスクで必要な場合に限定してください。
    1. 適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。
読み込んでいます...

18. 以下のコールバック関数のそれぞれで行う処理を定義します(メールの送信、コンテキストのロギング、Slack チャネルへのメッセージ送信など)。DAG によっては、何もしなくても問題がない場合があります。
a. success
b. failure
c. sla_miss
d. retry

例:

読み込んでいます...

19. タスクグループを使用してタスクを整理します。

例:

読み込んでいます...

20.オペレーターの変数名と task_id の引数を一致させる必要があります。

読み込んでいます...

21. タスクに SLA を追加します。
a. SLA をタスクレベルに設定し、タスクの実行時間が想定より長い場合は通知を受け取るようにします。

読み込んでいます...

Composer 環境の負荷を軽減する

22. Python 関数ではなく Jinja テンプレート / マクロを使用します。
a. Airflow のテンプレート フィールドを使用して、環境変数の値と Jinja テンプレートに含まれる値を DAG に取り込むことができます。こうすると、DAG をべき等にできるとともに(つまり、何度呼び出しても結果が変わらないこと)、スケジューラのハートビート中に不要な関数が実行されることを防止できます。
b. The Airflow エンジンはデフォルトで、すべてのテンプレート内でアクセスできる変数を渡します。

以下の例ではベスト プラクティスに反し、Python 関数 datetime に基づく変数を定義しています。

読み込んでいます...

DAG ファイルに上記のコードが含まれている場合、スケジューラがハードビートを送信するたびに、これらの関数が実行されますが、パフォーマンスが低い可能性があります。さらに重要なことに、これではべき等の DAG にはなりません。datetime.today() は DAG 実行の日付ではなく現在の日付を基準とするため、過去の日付に対して失敗した DAG を再実行することはできません。

これよりも適切な実装方法は、次のように Airflow 変数を使用することです。

読み込んでいます...

23. 独自の Airflow 変数を作成することは避けます。
a. 独自に作成した変数はメタデータ データベースに格納されるため、変数を取得するにはデータベース接続が必要になります。こうすると、Cloud Composer 環境のパフォーマンスに影響を与えかねません。そのため、環境関数または Google Cloud シークレットを使用してください。

24.すべての DAG をまったく同じスケジュールで実行しないようにします(可能な限り、ワークロードを分散して実行します)。
a. スケジュール間隔には、Airflow のマクロや time_deltas ではなく cron 式を優先して使用するようにします。こうすると、より厳密なスケジュールを指定できるとともに、1 日の中でワークロードを分散して実行しやすくなり、Cloud Composer 環境への負荷が軽減されます。
b. 特定の cron スケジュール式を生成するには、Crontab.guru が役立ちます。例についてはこちらでご確認ください。

例:

読み込んでいます...

25. 少量のデータの場合を除き、XComs を使用しないようにします。
a. 使用すると、保存量もデータベースへの接続数も増えます。
b. どうしても必要な場合は、値として JSON 辞書を使用します(辞書内の多数の値に対して 1 つの接続)

26. DAG / Google Cloud Storage のパスに不要なオブジェクトを追加しないようにします。
a. 追加しなければならない場合は、GCS のパスに、Airflow スケジューラが解析する必要のない .airflowignore ファイルを追加します(SQL、プラグインなどの場合)。

27. タスクの実行タイムアウトを設定します。

例:

読み込んでいます...

28. 可能な場合は、センサーではなく遅延可能な演算子を使用します。
a. 遅延可能な演算子は、待機の必要があることを把握すると自身を一時停止してワーカーを解放し、再開対象のジョブをトリガーに引き渡します。したがって、一時停止(遅延)中の遅延可能な演算子によってワーカー スロットが占有されることはないため、アイドル状態の演算子やセンサーで浪費されるクラスタ リソースの数 / 量が少なくなります。

例:

読み込んでいます...

29. センサーを使用するときは、必ず mode、poke_interval、timeout を定義します。
a. センサーを使用するには、Airflow ワーカーが実行されている必要があります。
b. n 秒間隔(つまり、poke_interval の値が 60 未満)でチェックするセンサーには、mode=poke を使用します。mode=poke が設定されたセンサーは、n 秒間隔で継続的にポーリングし、Airflow ワーカー リソースを保持します。
c. n 分間隔(つまり、poke_interval の値が 60 以上)でチェックするセンサーには、mode=reschedule を使用します。mode=reschedule が設定されたセンサーは、次のチェックまでの n 分間、Airflow ワーカー リソースを解放します。

例:

読み込んでいます...

30. Cloud Composer 環境への負荷を最小限にするために、処理を外部サービスBigQueryDataprocCloud Functions など)にオフロードします。
a. 一般的に、これらのサービスではそれぞれに固有の Airflow オペレーターを使用します。

31. サブ DAG を使用してはなりません。
a. サブ DAG は、DAG 内に再利用可能なタスクのグループを作成するための機能として古いバージョンの Airflow で使用されていました。しかし、パフォーマンスと機能上の問題の原因となっていたため、Airflow 2.0 でサブ DAG は非推奨となりました。

32. DAG 間の依存関係には Pub/Sub を使用します。
a. マルチクラスタ / DAG 間の依存関係の例をご確認ください。

33. DAG が迅速に読み込まれるようにします。
a. 不要な「最上位レベル」の Python コードは使用しないようにします。DAG 外部からの多数のインポート、変数、関数を使用する DAG では、Airflow スケジューラの解析に時間がかかるため、Cloud Composer / Airflow のパフォーマンスとスケーラビリティが低下します。
b. インポートと関数を DAG 内に移動すると、解析時間を(数秒単位にまで)短縮できます。
c. 開発された DAG によって DAG 解析時間が過剰に増加していないことを確認します。

例:

読み込んでいます...

開発とテストを効率化する

34. 「セルフチェック」を実装します(センサーまたは遅延可能な演算子を使用)。
a. タスクが想定どおりに機能していることを確認するために、DAG にチェックを追加できます。たとえば、タスクがデータを BigQuery パーティションに push する場合、その次のタスクに、パーティションが生成されたこと、およびデータが正しいことを確認するためのチェックを追加します。

例:

読み込んでいます...

35. 同様のタスク / タスクグループ / DAG を Python コードで動的に生成する機会を探します。
a. こうすると、DAG の開発プロセスを簡素化および標準化できます。

例:

読み込んでいます...

36. DAG の単体テストを実装します。

例:

読み込んでいます...

37. Composer ローカル開発 CLI ツールを使用して、ローカル開発を行います。
a. Composer ローカル開発 CLI ツールは、Airflow 環境をローカルで実行することにより、Cloud Composer 2 向けの Apache Airflow DAG 開発を合理化します。このローカルの Airflow 環境では、特定の Cloud Composer バージョンのイメージが使用されます。

38. 可能な場合は Cloud Composer ステージング環境を保持して、本番環境にデプロイする前に完全な DAG 実行のテストを十分に行います。
a. 変数(Google Cloud Storage オペレーションの出力パス、構成を読み取るために使用するデータベースなど)を変更するには、DAG をパラメータ化します。DAG 内に値をハードコードして、環境に応じて手動で変更しないでください。

39. PylintFlake8 などの Python lint チェックツールを使用して、標準化されたコードの lint チェックを行います。

40. BlackYAPF などの Python フォーマット ツールを使用して、標準化されたコードのフォーマットをチェックします。

次のステップ

このブログ記事では、Google Cloud Composer で使用する Airflow DAG を開発する際のベスト プラクティスを網羅したチェックリストをご紹介しました。デベロッパーはこれらのベスト プラクティスに沿って、Cloud Composer の動作を最適化し、よく整理されて管理しやすい DAG を開発できます。

Cloud Composer の詳細については、以下の関連するブログ投稿とドキュメントのページをご覧ください。

-戦略的クラウド エンジニア Christian Yarros

投稿先