DAG で遅延可能な演算子を使用する

Cloud Composer 1 | Cloud Composer 2

このページでは、環境で遅延可能な演算子のサポートを有効にし、DAG で遅延可能な Google Cloud 演算子を使用する方法について説明します。

Cloud Composer での延期可能な演算子について

少なくとも 1 つのトリガー インスタンス(または復元性の高い環境では少なくとも 2 つ)がある場合は、DAG で遅延可能な演算子とトリガーを使用できます。

遅延可能な演算子の場合、Airflow はタスクの実行を次のステージに分割します。

  1. オペレーションを開始します。この段階で、タスクは Airflow ワーカー スロットを占有しています。タスクは、ジョブを別のサービスに委任するオペレーションを実行します。

    たとえば、BigQuery ジョブの実行には数秒から数時間かかることがあります。ジョブが作成された後、オペレーションは作業識別子(BigQuery ジョブ ID)を Airflow トリガーに渡します。

  2. トリガーは、ジョブが終了するまでモニタリングします。このステージでは、ワーカー スロットは占有されません。Airflow トリガーは非同期アーキテクチャを備えており、数百ものジョブを処理できます。トリガーは、ジョブの終了を検出すると、最終ステージをトリガーするイベントを送信します。

  3. 最終ステージでは、Airflow ワーカーがコールバックを実行します。このコールバックでは、タスクを成功とマークするか、別のオペレーションを実行して、トリガーでモニタリングするジョブを設定できます。

トリガーはステートレスであるため、割り込みや再起動に対する復元性があります。このため、短いと予想される最後のステージ中に再起動が発生しない限り、Pod の再起動があっても長時間実行ジョブを復元可能です。

始める前に

  • 遅延可能な演算子とセンサーは Cloud Composer 2 環境で利用できます。次のものが必要です。
    • Cloud Composer 2.0.31 以降のバージョン
    • Airflow 2.2.5、2.3.3 以降のバージョン

遅延可能な演算子のサポートを有効にする

Airflow トリガーという環境コンポーネントは、環境内のすべての遅延タスクを非同期的にモニタリングします。このようなタスクからの遅延オペレーションが完了すると、トリガーによってタスクが Airflow ワーカーに渡されます。

DAG で遅延可能モードを使用するには、環境内に少なくとも 1 つのトリガー インスタンス(または高復元環境では少なくとも 2 つ)が必要です。 トリガーは、環境の作成時に構成することも、既存の環境のトリガー数とパフォーマンス パラメータを調整することもできます。

遅延可能モードをサポートする Google Cloud の演算子

一部の Airflow オペレータのみが遅延可能なモデルをサポートするように拡張されています。 次のリストは、遅延可能モードをサポートする airflow.providers.google.operators.cloud パッケージの演算子のリファレンスです。必要最小限の airflow.providers.google.operators.cloud パッケージ バージョンを含む列は、その演算子が遅延可能モードをサポートしている最も古いパッケージ バージョンを表します。

Cloud Composer 演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

BigQuery Operators

演算子名必要な apache-airflow-providers-google のバージョン
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery Data Transfer Service の演算子

演算子名必要な apache-airflow-providers-google のバージョン
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Cloud Build の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudBuildCreateBuildOperator 8.7.0

Cloud SQL の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudSQLExportInstanceOperator 10.3.0

Dataflow の演算子

演算子名必要な apache-airflow-providers-google のバージョン
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Cloud Data Fusion の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudDataFusionStartPipelineOperator 8.9.0

Google Kubernetes Engine の演算子

演算子名必要な apache-airflow-providers-google のバージョン
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

AI Platform の演算子

演算子名必要な apache-airflow-providers-google のバージョン
MLEngineStartTrainingJobOperator 8.9.0

DAG で遅延可能な演算子を使用する

すべての Google Cloud オペレーターの一般的な規則では、deferrable ブール型パラメータを使用して遅延可能モードを有効にします。Google Cloud 演算子にこのパラメータがない場合、遅延可能モードで実行することはできません。他の演算子には異なる規則が適用される場合があります。たとえば、一部のコミュニティ オペレータには、名前に Async 接尾辞が付いた別のクラスがあります。

次の DAG の例では、遅延可能モードで DataprocSubmitJobOperator 演算子を使用しています。

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

トリガーログを表示する

トリガーによって、他の環境コンポーネントのログとともに使用可能なログが生成されます。環境ログの表示について詳しくは、ログを表示をご覧ください。

トリガーをモニタリングする

トリガー コンポーネントのモニタリングの詳細については、Airflow 指標をご覧ください。

トリガーのモニタリングに加えて、環境のモニタリング ダッシュボードの [Uncompleted Task] 指標で遅延可能なタスクの数を確認できます。

次のステップ