Cloud Composer 1 | Cloud Composer 2
このページでは、環境で遅延可能な演算子のサポートを有効にし、DAG で遅延可能な Google Cloud 演算子を使用する方法について説明します。
Cloud Composer での延期可能な演算子について
少なくとも 1 つのトリガー インスタンス(または復元性の高い環境では少なくとも 2 つ)がある場合は、DAG で遅延可能な演算子とトリガーを使用できます。
遅延可能な演算子の場合、Airflow はタスクの実行を次のステージに分割します。
オペレーションを開始します。この段階で、タスクは Airflow ワーカー スロットを占有しています。タスクが、ジョブを別のサービスに委任するオペレーションを実行します。
たとえば、BigQuery ジョブの実行には数秒から数時間かかることがあります。ジョブを作成すると、オペレーションによって作業識別子(BigQuery ジョブ ID)が Airflow トリガーに渡されます。
トリガーによって、ジョブが完了するまでモニタリングされます。このステージでは、ワーカー スロットは占有されません。Airflow トリガーは非同期アーキテクチャを備えており、このようなジョブを数百個処理できます。トリガーは、ジョブの終了を検出すると、最終ステージをトリガーするイベントを送信します。
最終ステージでは、Airflow ワーカーがコールバックを実行します。このコールバックでは、タスクを成功とマークするか、別のオペレーションを実行して、トリガーでモニタリングするジョブを設定できます。
triggerer はステートレスであるため、中断や再起動に対する復元性があります。このため、短いと予想される最後のステージ中に再起動が発生しない限り、Pod の再起動があっても長時間実行ジョブを復元可能です。
始める前に
- 遅延可能な演算子とセンサーは Cloud Composer 2 環境で利用できます。次のものが必要です。
- Cloud Composer 2.0.31 以降のバージョン
- Airflow 2.2.5、2.3.3 以降のバージョン
遅延可能な演算子のサポートを有効にする
Airflow トリガーという環境コンポーネントは、環境内のすべての遅延タスクを非同期的にモニタリングします。このようなタスクからの遅延可能なオペレーションが完了すると、トリガーはタスクを Airflow ワーカーに渡します。
DAG で遅延可能モードを使用するには、環境内に少なくとも 1 つのトリガー インスタンス(または高復元環境では少なくとも 2 つ)が必要です。 トリガーは、環境の作成時に構成できます。または、既存の環境のトリガーの数とパフォーマンス パラメータを調整することもできます。
遅延可能モードをサポートする Google Cloud の演算子
一部の Airflow オペレータのみが遅延可能なモデルをサポートするように拡張されています。
次のリストは、遅延可能モードをサポートする airflow.providers.google.operators.cloud
パッケージの演算子のリファレンスです。必要最小限の airflow.providers.google.operators.cloud
パッケージ バージョンを含む列は、その演算子が遅延可能モードをサポートしている最も古いパッケージ バージョンを表します。
Cloud Composer 演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
BigQuery Operators
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
BigQuery Data Transfer Service の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
Cloud Build の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Cloud SQL の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Dataflow の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
Cloud Data Fusion の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataproc の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
AI Platform の演算子
演算子名 | 必要な apache-airflow-providers-google バージョン |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
DAG で遅延可能な演算子を使用する
すべての Google Cloud オペレーターの一般的な規則では、deferrable
ブール型パラメータを使用して遅延可能モードを有効にします。Google Cloud 演算子にこのパラメータがない場合、遅延可能モードで実行することはできません。他の演算子の規則は異なります。たとえば、一部のコミュニティ演算子には、名前に Async
接尾辞が付いた別のクラスがあります。
次の DAG の例では、遅延可能モードで DataprocSubmitJobOperator
演算子を使用しています。
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
トリガーログを表示する
トリガーは、他の環境コンポーネントのログとともに使用可能なログを生成します。環境ログの表示について詳しくは、ログを表示をご覧ください。
トリガーをモニタリングする
トリガー コンポーネントのモニタリングの詳細については、Airflow の指標をご覧ください。
トリガーのモニタリングに加えて、環境のモニタリング ダッシュボードの [Uncompleted Task] 指標で遅延可能なタスクの数を確認できます。