Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。
Apache Airflow では強固な DAG とタスクの分離が実現されないため、DAG の干渉を回避するために分離された状態で本番環境とテスト環境を使用することをおすすめします。詳細については、DAG のテストをご覧ください。
Airflow DAG の構造化
Airflow DAG は Python ファイルで定義されており、以下のコンポーネントで構成されています。
- DAG 定義
- Airflow オペレーター
- オペレーターの関係
次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。
DAG の定義
次の例は、Airflow DAG の定義を示しています。
Airflow 2
Airflow 1
オペレーターとタスク
Airflow 演算子は行う処理を記述します。タスクタスクは、演算子の特定のインスタンスです。
Airflow 2
Airflow 1
タスクの関係
タスクの関係は、作業を完了する必要がある順序を記述します。
Airflow 2
Airflow 1
Python の完全な DAG ワークフローの例
次のワークフローは、hello_python
タスクと goodbye_bash
タスクという 2 つのタスクで構成される完全な作業用 DAG テンプレートです。
Airflow 2
Airflow 1
Airflow DAG の定義の詳細については、Airflow チュートリアルと Airflow のコンセプトをご覧ください。
Airflow オペレーター
次の例は、いくつかの一般的な Airflow オペレーターを示しています。Airflow オペレーターの信頼できるリファレンスについては、オペレーターとフックのリファレンスとプロバイダ インデックスをご覧ください。
BashOperator
BashOperator を使用してコマンドライン プログラムを実行します。
Airflow 2
Airflow 1
Cloud Composer では、Airflow ワーカーの bash スクリプトで、指定されたコマンドが実行されます。 ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。
gcloud
コマンド。Cloud Storage バケットを操作するためのgcloud storage
サブコマンドなど。bq
コマンドkubectl
コマンド
PythonOperator
PythonOperator を使用して任意の Python コードを実行します。
環境で使用される Cloud Composer イメージのバージョンのパッケージを含むコンテナ内で、Cloud Composer によって Python コードが実行されます。
追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。
Google Cloud のオペレーター
Google Cloud プロダクトを使用するタスクを実行するには、Google Cloud Airflow オペレーターを使用します。たとえば、BigQuery のオペレーターは、BigQuery でデータのクエリ、処理を行います。
Google Cloud と Google Cloud が提供する個々のサービス用の Airflow オペレーターが多数あります。完全なリストについては、Google Cloud のオペレーターをご覧ください。
Airflow 2
Airflow 1
EmailOperator
DAG からメールを送信するには、EmailOperator を使用します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成します。
Airflow 2
Airflow 1
オペレーター失敗時の通知
DAG 内のオペレーターが失敗したときにメール通知を送信するには、email_on_failure
を True
に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。
Airflow 2
Airflow 1
DAG ワークフローのガイドライン
ネストされたディレクトリの DAG の ZIP アーカイブにカスタム Python ライブラリを配置します。ライブラリは DAG ディレクトリの最上位には配置しないでください。
Airflow が
dags/
フォルダをスキャンするとき、DAG フォルダの最上位にある Python モジュールの DAG、および最上位のdags/
フォルダに配置された ZIP アーカイブの最上位にある Python モジュールの DAG のみがチェックされます。部分文字列airflow
とDAG
の両方を含まない ZIP アーカイブ内の Python モジュールが存在する場合、Airflow は ZIP アーカイブの処理を停止します。Airflow はその時点までに見つかった DAG のみを返します。Airflow 1 の代わりに Airflow 2 を使用します。
Airflow コミュニティでは、Airflow 1 の新しいマイナー リリースやパッチリリースが公開されなくなりました。
フォールト トラレンスのために、同じ Python モジュール内に複数の DAG オブジェクトを定義しないでください。
SubDAG を使用しないでください。代わりに、DAG 内にタスクをグループ化します。
DAG 解析に必要なファイルは、
data/
ディレクトリではなく、dags/
フォルダに配置します。DAG のテスト手順で推奨されているように、開発または変更された DAG をテストします。
開発された DAG によって DAG 解析時間が過剰に増加していないことを確認します。
Airflow タスクは、いくつかの理由で失敗することがあります。DAG の実行全体のエラーを回避するには、タスクの再試行を有効にすることをおすすめします。 最大再試行回数を
0
に設定すると、再試行は行われなくなります。default_task_retries
オプションをオーバーライドし、タスク再試行の値を0
以外に設定することをおすすめします。また、タスクレベルでretries
パラメータを設定できます。Airflow タスクで GPU を使用する場合は、GPU 搭載マシンを使用するノードに基づいて、別の GKE クラスタを作成します。GKEStartPodOperator を使用してタスクを実行します。
CPU とメモリを大量に消費するタスクを、他の Airflow コンポーネント(スケジューラ、ワーカー、ウェブサーバー)が実行されているクラスタのノードプールで実行しないでください。 代わりに、KubernetesPodOperator または GKEStartPodOperator を使用します。
DAG を環境にデプロイする場合は、DAG の解釈と実行に不可欠なファイルのみを
/dags
フォルダにアップロードします。/dags
フォルダ内の DAG ファイルの数を制限します。Airflow は、
/dags
フォルダ内の DAG を継続的に解析します。解析は、DAG フォルダをくまなくループするプロセスであり、(依存関係とともに)読み込む必要があるファイル数は、DAG の解析とタスクのスケジューリングのパフォーマンスに影響を及ぼします。DAG が 100 個あるファイルを 100 個使用するほうが、1 個の DAG を 10,000 個使用するよりはるかに効率的であるため、このような最適化をおすすめします。この最適化は、解析時間と DAG の作成と管理の効率のバランスを取るものです。また、たとえば、10,000 個の DAG ファイルをデプロイするには、それぞれに 100 個の DAG ファイルを含む 100 個の ZIP ファイルを作成することができます。
前述のヒントに加えて、10,000 個を超える DAG ファイルがある場合は、プログラムによる方法で DAG を生成するのが適切な選択肢である可能性があります。たとえば、いくつかの DAG オブジェクト(例: 20,100 個の DAG オブジェクト)を生成する単一の Python DAG ファイルを実装できます。
サポートが終了した Airflow オペレーターの使用を回避する
次の表に示すオペレーターはサポートが終了しました。これらの演算子の一部は、Cloud Composer 1 の初期のバージョンでサポートされていました。DAG では使用しないでください。代わりに、提供されている最新の代替方法を使用します。
非推奨の演算子 | 使用する演算子 |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator、MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
DAG の作成に関するよくある質問
複数の DAG で同じまたは類似したタスクを実行する場合、コードの繰り返しを最小限に抑えるにはどうすればよいですか?
コードの繰り返しを最小限に抑えるには、ライブラリを定義したりラッパーを定義したりすることをおすすめします。
DAG ファイル間でコードを再利用するにはどうすればよいですか?
ユーティリティ関数をローカルの Python ライブラリに入れて、その関数をインポートしてください。環境のバケット内の dags/
フォルダに置かれた任意の DAG の関数を参照できます。
異なる定義が作成されるリスクを最小限に抑えるにはどうすればよいですか?
たとえば、元データを集計して収益の指標を作成しようとしているチームが 2 つあるとします。これらのチームが同じ目的を達成するために、2 つのわずかに異なったタスクを作成する場合を考えてください。その場合、収益データの処理に対するライブラリを定義すると、DAG の実装者が、集計対象の収益の定義を明確にできます。
DAG 間の依存関係を設定するにはどうすればよいですか?
これは依存関係をどのように定義するかによって異なります。
DAG A と DAG B の 2 つの DAG があり、DAG A の後に DAG B をトリガーする場合は、DAG A の末尾に TriggerDagRunOperator
を配置します。
DAG B が Pub/Sub メッセージなどの DAG A が生成するアーティファクトのみに依存している場合は、センサーを使用するほうが有効な場合があります。
DAG B が DAG A と密に統合されている場合は、2 つの DAG を 1 つの DAG にマージできる場合があります。
DAG とそのタスクに一意の実行 ID を渡すにはどうすればよいですか?
たとえば、Dataproc のクラスタ名とファイルパスを渡すとします。
ランダムな一意の ID は、PythonOperator
で str(uuid.uuid4())
を返すことにより生成できます。これにより ID が XComs
に配置され、テンプレート フィールドを介して他のオペレーターで ID を参照できるようになります。
DagRun 固有の ID のほうが有用な場合もあるので、uuid
を生成する前にその可能性を検討してください。マクロを使用して Jinja 置換でこれらの ID を参照することもできます。
DAG のタスクを分けるにはどうすればよいですか?
各タスクは、べき等の作業ユニットである必要があります。そのため、PythonOperator
内で実行される複雑なプログラムのように、単一タスク内でマルチステップ ワークフローをカプセル化するのを避ける必要があります。
複数のソースからのデータを集計する場合、単一の DAG に複数のタスクを定義するべきですか?
たとえば、元データを含む複数のテーブルがあり、各テーブルの日次集計を作成するとします。タスクは互いに依存していません。この場合、各テーブルにタスクと DAG を 1 つずつ作成するべきでしょうか、それとも、1つの大きな DAG を作成しますか。
各タスクで、複数の同じ DAG レベルのプロパティ(schedule_interval
など)を共有しても差し支えない場合は、単一の DAG に複数のタスクを定義するのが有効な選択肢です。一方、コードの繰り返しを最小限に抑える必要がある場合は、モジュールの globals()
に複数の DAG を配置することで、これらを単一の Python モジュールから生成できます。
DAG で実行中の同時タスクの数を制限するにはどうすればよいですか?
たとえば、API の使用量上限や割り当ての超過を回避する、または過剰な同時プロセスの実行を回避する必要がある場合が該当します。
Airflow ウェブ UI で Airflow Pools を定義し、DAG 内のタスクを既存のプールに関連付けることができます。
オペレーターの使用に関するよくある質問
DockerOperator
を使用するべきですか?
リモートでの Docker のインストール(環境のクラスタ内ではありません)でコンテナの起動に使用されていない場合は、DockerOperator
の使用はおすすめしません。Cloud Composer 環境では、オペレーターは Docker デーモンにアクセスできません。
代わりに、KubernetesPodOperator
または GKEStartPodOperator
を使用します。これらのオペレーターは、Kubernetes Pod をそれぞれ Kubernetes クラスタまたは GKE クラスタに起動します。環境のクラスタに Pod を起動することは、リソースが競合する可能性があるためおすすめしません。
SubDagOperator
を使用するべきですか?
SubDagOperator
の使用はおすすめしません。
タスクのグループ化に記載されている代替手法を使用します
Python オペレーターを完全に分離するには、PythonOperators
内でのみ Python コードを実行するべきですか?
目的に応じていくつかの選択肢があります。
Python 依存関係の分離を維持することが唯一の目的である場合は、PythonVirtualenvOperator
を使用できます。
KubernetesPodOperator
を使用することを検討してください。このオペレーターを使用すると、Kubernetes Pod を定義して、他のクラスタで Pod を実行できます。
カスタムのバイナリ パッケージや非 PyPI パッケージを追加するにはどうすればよいですか?
プライベート パッケージ リポジトリでホストされているパッケージをインストールできます。
DAG とそのタスクに対して一律に引数を渡すにはどうすればよいですか?
Jinja テンプレートに対する Airflow の組み込みサポートを使用して、テンプレート フィールドで使用できる引数を渡すことができます。
テンプレート置換はいつ行われますか?
テンプレート置換は、オペレーターの pre_execute
関数が呼び出される直前の Airflow ワーカーで行われます。これは、実際にはタスクが実行される直前までテンプレート置換が行われないことを意味します。
どの演算子引数がテンプレート置換をサポートするのかを知るにはどうすればよいですか?
Jinja2 のテンプレート置換をサポートするオペレーターの引数は、そのように明示的にマークされます。
オペレーターの定義内の template_fields
フィールドを確認してください。このフィールドにテンプレート置換の対象となる引数名のリストが示されています。
たとえば、BashOperator
では、引数 bash_command
および env
のテンプレートがサポートされています。