このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。
Airflow DAG の構造化
Airflow DAG は Python ファイルで定義されており、DAG 定義、オペレーター、オペレーターの関係といったコンポーネントで構成されています。次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。
DAG の定義です。
Airflow 2
Airflow 1
行う処理を記述するオペレーター。オペレーターをインスタンス化したものは、タスクと呼ばれます。
Airflow 2
Airflow 1
処理を完了する順序を記述するタスクの関係。
Airflow 2
Airflow 1
Python での完全な DAG ワークフローの例
次のワークフローは、hello_python
タスクと goodbye_bash
タスクの 2 つのタスクで構成される完全な作業用 DAG テンプレートです。
Airflow 2
Airflow 1
Airflow のチュートリアルとコンセプト
Airflow DAG の定義に関する詳細については、Airflow チュートリアルと Airflow のコンセプトをご覧ください。
Airflow オペレータ
次の例は、いくつかの一般的な Airflow オペレーターを示しています。Airflow オペレーターの信頼できるリファレンスについては、Apache Airflow API リファレンスをご覧になるか、core、contrib、providers のオペレーターのソースコードをご覧ください。
BashOperator
BashOperator を使用してコマンドライン プログラムを実行します。
Airflow 2
Airflow 1
Cloud Composer では、ワーカーの bash スクリプトで、指定されたコマンドが実行されます。 ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。
PythonOperator
PythonOperator を使用して任意の Python コードを実行します。
環境で使用される Cloud Composer イメージのバージョンのパッケージを含むコンテナ内で、Cloud Composer によって Python コードが実行されます。
追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。
Google Cloud のオペレーター
Google Cloud プロダクトを使用するタスクを実行するには、Google Cloud Airflow のオペレーターを使用します。Cloud Composer によって環境のプロジェクトへの Airflow 接続が自動的に構成されます。
BigQuery のオペレーターは、BigQuery でデータのクエリ、処理を行います。
Airflow 2
Airflow 1
Dataflow のオペレーターは、Dataflow で Apache Beam ジョブを実行します。
Cloud Data Fusion のオペレーターは、Cloud Data Fusion パイプラインの管理と実行を行います。
Dataproc のオペレーターは、Dataproc で Hadoop ジョブと Spark ジョブを実行します。
Datastore のオペレーターは、Datastore でデータの読み取りと書き込みを実行します。
AI Platform のオペレーターは、AI Platform でトレーニング ジョブと予測ジョブを実行します。
Cloud Storage のオペレーターは、Cloud Storage でデータの読み取りと書き込みを実行します。
EmailOperator
DAG からメールを送信するには、EmailOperator を使用します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成する必要があります。
Airflow 2
Airflow 1
通知
DAG 内のオペレーターが失敗したときにメール通知を送信するには、email_on_failure
を True
に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。
Airflow 2
Airflow 1
DAG ワークフロー ガイドライン
Airflow 1 ではなく Airflow 2 を使用します。
Airflow コミュニティは Airflow 1 の新しいマイナー リリースやパッチリリースを公開しなくなりました。
ネストされたディレクトリの DAG の ZIP アーカイブにカスタム Python ライブラリを配置します。ライブラリは DAG ディレクトリの最上位には配置しないでください。
Airflow が
dags/
フォルダをスキャンする際、DAG フォルダの最上位にある Python モジュールの DAG、および最上位のdags/
フォルダに配置された ZIP アーカイブの最上位にある Python モジュールの DAG のみがチェックされます。部分文字列airflow
とDAG
の両方を含まない ZIP アーカイブ内の Python モジュールが存在する場合、Airflow は ZIP アーカイブの処理を停止します。Airflow はその時点までに見つかった DAG のみを返します。フォールト トラレンスのために、同じ Python モジュール内に複数の DAG オブジェクトを定義しないでください。
SubDAG を使用しないでください。代わりに、代替方法を使用してください。
DAG 解析に必要なファイルは、
data/
ディレクトリではなく、dags/
フォルダに配置します。DAG のテスト手順で推奨されているように、開発または変更された DAG をテストします。
開発された DAG によって DAG 解析時間が過剰に増加していないことを確認します。
Airflow タスクは複数の理由で失敗することがあります。DAG 全体の実行の失敗を回避するために、タスクの再試行を有効にすることをおすすめします。最大再試行回数を
0
に設定すると、再試行は行われません。default_task_retries
オプションをオーバーライドして、タスクの廃止に0
以外の値を設定することをおすすめします。さらに、再試行パラメータをタスクレベルで設定することもできます(必要な場合)。Airflow タスクで GPU を使用する場合は、GPU を備えたマシンを使用するノードに基づいて個別の GKE クラスタを作成します。GKEStartPodOperator を使用してタスクを実行します。
CPU とメモリを大量に消費するタスクを、他の Airflow コンポーネント(スケジューラ、ワーカー、ウェブサーバー)が実行されているクラスタのノードプールで実行しないでください。代わりに KubernetesPodOperator または GKEStartPodOperator を使用してください。
DAG を環境にデプロイする場合は、DAG の解釈と実行に不可欠なファイルのみを
/dags
フォルダにアップロードします。/dags
フォルダ内の DAG ファイルの数を制限します。Airflow は
/dags
フォルダ内の DAG を継続的に解析しています。解析は DAG フォルダをループ処理するプロセスで、(必要な依存関係とともに)読み込む必要があるファイルの数が、DAG 解析とタスク スケジューリングのパフォーマンスに影響を与えます。それぞれ 1 つの DAG を持つ 10,000 ファイルよりも 100 個の DAG を持つ 100 ファイルを使用する方がはるかに効率的であるため、そのような最適化をおすすめします。この最適化は、解析時間と DAG の作成と管理の効率のバランスを取るものです。たとえば、10,000 個の DAG ファイルをデプロイする場合は、100 個の ZIP ファイルを作成し、それぞれに 100 個の DAG ファイルを含めることができます。
前述のヒントに加えて、10,000 個を超える DAG ファイルがある場合は、プログラムによる方法で DAG を生成するのが適切な選択肢である可能性があります。たとえば、いくつかの DAG オブジェクト(例: 20,100 個の DAG オブジェクト)を生成する単一の Python DAG ファイルを実装できます。
DAG の作成に関するよくある質問
複数の DAG で同じまたは類似したタスクを実行する場合、コードの繰り返しを最小限に抑えるにはどうすればよいですか?
コードの繰り返しを最小限に抑えるには、ライブラリを定義したりラッパーを定義したりすることをおすすめします。
DAG ファイル間でコードを再利用するにはどうすればよいですか?
ユーティリティ関数をローカルの Python ライブラリに入れて、その関数をインポートしてください。環境のバケット内の dags/
フォルダに置かれた任意の DAG の関数を参照できます。
異なる定義が作成されるリスクを最小限に抑えるにはどうすればよいですか?
たとえば、元データを集計して収益の指標を作成しようとしているチームが 2 つあるとします。これらのチームが同じ目的を達成するために、2 つのわずかに異なったタスクを作成する場合を考えてください。その場合、収益データの処理に対するライブラリを定義すると、DAG の実装者が、集計対象の収益の定義を明確にできます。
DAG 間の依存関係を設定するにはどうすればよいですか?
これは依存関係をどのように定義するかによって異なります。
DAG A と DAG B の 2 つの DAG があり、DAG A の後に DAG B をトリガーする場合は、DAG A の末尾に TriggerDagRunOperator
を配置します。
DAG B が Pub/Sub メッセージなどの DAG A が生成するアーティファクトのみに依存している場合は、センサーを使用するほうが有効な場合があります。
DAG B が DAG A と密に統合されている場合は、2 つの DAG を 1 つの DAG にマージできる場合があります。
DAG とそのタスクに一意の実行 ID を渡すにはどうすればよいですか?
たとえば、Dataproc のクラスタ名とファイルパスを渡すとします。
ランダムな一意の ID は、PythonOperator
で str(uuid.uuid4())
を返すことにより生成できます。これにより ID が XComs
に配置され、テンプレート フィールドを介して他のオペレーターで ID を参照できるようになります。
DagRun 固有の ID のほうが有用な場合もあるので、uuid
を生成する前にその可能性を検討してください。マクロを使用して Jinja 置換でこれらの ID を参照することもできます。
DAG のタスクを分けるにはどうすればよいですか?
各タスクは、べき等の作業ユニットである必要があります。そのため、PythonOperator
内で実行される複雑なプログラムのように、単一タスク内でマルチステップ ワークフローをカプセル化するのを避ける必要があります。
複数のソースからのデータを集計する場合、単一の DAG に複数のタスクを定義するべきですか?
たとえば、元データを含む複数のテーブルがあり、各テーブルの日次集計を作成するとします。タスクは互いに依存していません。この場合、各テーブルにタスクと DAG を 1 つずつ作成するべきでしょうか、それとも、1つの大きな DAG を作成しますか。
各タスクで、複数の同じ DAG レベルのプロパティ(schedule_interval
など)を共有しても差し支えない場合は、単一の DAG に複数のタスクを定義するのが有効な選択肢です。一方、コードの繰り返しを最小限に抑える必要がある場合は、モジュールの globals()
に複数の DAG を配置することで、これらを単一の Python モジュールから生成できます。
DAG で実行中の同時タスクの数を制限するにはどうすればよいですか?
たとえば、API の使用量上限や割り当ての超過を回避する、または過剰な同時プロセスの実行を回避する必要がある場合が該当します。
Airflow ウェブ UI で Airflow Pools を定義し、DAG 内のタスクを既存のプールに関連付けることができます。
オペレーターの使用に関するよくある質問
DockerOperator
を使用するべきですか?
リモートでの Docker のインストール(環境のクラスタ内ではありません)でコンテナの起動に使用されていない場合は、DockerOperator
の使用はおすすめしません。Cloud Composer 環境では、オペレーターは Docker デーモンにアクセスできません。
代わりに、KubernetesPodOperator
または GKEStartPodOperator
を使用します。これらのオペレーターは、Kubernetes Pod をそれぞれ Kubernetes クラスタまたは GKE クラスタに起動します。環境のクラスタに Pod を起動することは、リソースが競合する可能性があるためおすすめしません。
SubDagOperator
を使用するべきですか?
SubDagOperator
の使用はおすすめしません。
タスクのグループ化の手順に記載されている代替手法を使用します
Python オペレーターを完全に分離するには、PythonOperators
内でのみ Python コードを実行するべきですか?
目的に応じていくつかの選択肢があります。
Python 依存関係の分離を維持することが唯一の目的である場合は、PythonVirtualenvOperator
を使用できます。
KubernetesPodOperator
の使用を検討してください。この演算子を使用すると、Kubernetes Pod を定義して、他のクラスタで Pod を実行できます。
カスタムのバイナリ パッケージや非 PyPI パッケージを追加するにはどうすればよいですか?
プライベート パッケージ リポジトリでホストされているパッケージをインストールできます。
また、KubernetesPodOperator
を使用して、カスタム パッケージで構築された独自のイメージで Kubernetes Pod を実行することもできます。
DAG とそのタスクに対して一律に引数を渡すにはどうすればよいですか?
Jinja テンプレートに対する Airflow の組み込みサポートを使用して、テンプレート フィールドで使用できる引数を渡すことができます。
テンプレート置換はいつ行われますか?
テンプレート置換は、オペレーターの pre_execute
関数が呼び出される直前の Airflow ワーカーで行われます。これは、実際にはタスクが実行される直前までテンプレート置換が行われないことを意味します。
どのオペレーター引数がテンプレート置換をサポートするのかを知るにはどうすればよいですか?
Jinja2 のテンプレート置換をサポートするオペレーターの引数は、そのように明示的にマークされます。
オペレーターの定義内の template_fields
フィールドを確認してください。このフィールドにテンプレート置換の対象となる引数名のリストが示されています。
たとえば、BashOperator
では、引数 bash_command
および env
のテンプレートがサポートされています。
サポートが終了した Airflow オペレーターの使用を回避する
次の表に示す演算子は非推奨です。DAG では使用しないでください。代わりに、提供されている最新の方法を使用してください。
非推奨の演算子 | 使用する演算子 |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator、MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
次のステップ
- DAG のトラブルシューティング
- トラブルシューティングのスケジューラ
- Google のオペレーター
- Google Cloud Platform の演算子
- Apache Airflow チュートリアル
- Apache Airflow API リファレンス
- GitHub 上の core Airflow オペレーター。 Airflow リリースのブランチをご覧ください。
- GitHub 上の contrib Airflow オペレーター。 Airflow リリースのブランチをご覧ください。