DAG で遅延可能な演算子を使用する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、環境で遅延可能な演算子のサポートを有効にし、DAG で遅延可能な Google Cloud 演算子を使用する方法について説明します。

Cloud Composer での延期可能な演算子について

少なくとも 1 つのトリガー インスタンス(または復元性の高い環境では少なくとも 2 つ)がある場合は、DAG で遅延可能な演算子とトリガーを使用できます。

遅延可能な演算子の場合、Airflow はタスクの実行を次のステージに分割します。

  1. オペレーションを開始します。この段階で、タスクは Airflow ワーカー スロットを占有しています。タスクは、ジョブを別のサービスに委任するオペレーションを実行します。

    たとえば、BigQuery ジョブの実行には数秒から数時間かかることがあります。ジョブの作成後、オペレーションは作業 ID(BigQuery ジョブ ID)を Airflow トリガーに渡します。

  2. トリガーはジョブが終了するまでモニタリングします。このステージでは、ワーカー スロットは占有されません。Airflow トリガーは非同期アーキテクチャを備えており、数百のこのようなジョブを処理できます。トリガーはジョブの終了を検出すると、最終ステージをトリガーするイベントを送信します。

  3. 最終ステージでは、Airflow ワーカーがコールバックを実行します。このコールバックでは、タスクを成功とマークするか、別のオペレーションを実行して、トリガーでモニタリングするジョブを設定できます。

トリガーはステートレスであるため、中断や再起動に対してレジリエンスがあります。このため、短いと予想される最後のステージ中に再起動が発生しない限り、Pod の再起動があっても長時間実行ジョブを復元可能です。

始める前に

  • 遅延可能な演算子とセンサーは Cloud Composer 2 環境で利用できます。次のものが必要です。
    • Cloud Composer 2.0.31 以降のバージョン
    • Airflow 2.2.5、2.3.3 以降のバージョン

遅延可能な演算子のサポートを有効にする

Airflow トリガーという環境コンポーネントは、環境内のすべての遅延タスクを非同期的にモニタリングします。このようなタスクからの遅延可能なオペレーションが完了すると、トリガーはタスクを Airflow ワーカーに渡します。

DAG で遅延可能モードを使用するには、環境内に少なくとも 1 つのトリガー インスタンス(または高復元環境では少なくとも 2 つ)が必要です。 トリガーは、環境の作成時に構成できます。または、既存の環境のトリガーの数とパフォーマンス パラメータを調整することもできます。

遅延可能モードをサポートする Google Cloud の演算子

一部の Airflow オペレータのみが遅延可能なモデルをサポートするように拡張されています。 次のリストは、遅延可能モードをサポートする airflow.providers.google.operators.cloud パッケージの演算子のリファレンスです。必要最小限の airflow.providers.google.operators.cloud パッケージ バージョンを含む列は、その演算子が遅延可能モードをサポートしている最も古いパッケージ バージョンを表します。

Cloud Composer 演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

BigQuery Operators

演算子名必要な apache-airflow-providers-google のバージョン
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery Data Transfer Service の演算子

演算子名必要な apache-airflow-providers-google のバージョン
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Cloud Build の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudBuildCreateBuildOperator 8.7.0

Cloud SQL の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudSQLExportInstanceOperator 10.3.0

Dataflow の演算子

演算子名必須の apache-airflow-providers-google バージョン
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Cloud Data Fusion の演算子

演算子名必要な apache-airflow-providers-google のバージョン
CloudDataFusionStartPipelineOperator 8.9.0

Google Kubernetes Engine オペレーター

演算子名必要な apache-airflow-providers-google のバージョン
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

AI Platform オペレーター

演算子名必須の apache-airflow-providers-google バージョン
MLEngineStartTrainingJobOperator 8.9.0

DAG で遅延可能な演算子を使用する

すべての Google Cloud オペレーターの一般的な規則では、deferrable ブール型パラメータを使用して遅延可能モードを有効にします。Google Cloud 演算子にこのパラメータがない場合、遅延可能モードで実行することはできません。他の演算子では、異なる規則が適用される場合があります。たとえば、一部のコミュニティ演算子には、名前に Async 接尾辞が付いた別のクラスがあります。

次の DAG の例では、遅延可能モードで DataprocSubmitJobOperator 演算子を使用しています。

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

トリガーログを表示する

トリガーは、他の環境コンポーネントのログとともに使用可能なログを生成します。環境ログの表示について詳しくは、ログを表示をご覧ください。

トリガーをモニタリングする

トリガー コンポーネントのモニタリングの詳細については、Airflow の指標をご覧ください。

トリガーのモニタリングに加えて、環境のモニタリング ダッシュボードの [Uncompleted Task] 指標で遅延可能なタスクの数を確認できます。

次のステップ