Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
This page explains how to enable support for Deferrable Operators in your environment and use deferrable Google Cloud operators in your DAGs.
About Deferrable Operators in Cloud Composer
If you have at least one triggerer instance (or at least two in highly resilient environments), you can use Deferrable Operators and Triggers in your DAGs.
For deferrable operators, Airflow splits task execution into the following stages:
Start the operation. In this stage, the task occupies an Airflow worker slot. The task performs an operation that delegates the job to a different service.
For example, running a BigQuery job can take from a few seconds to several hours. After creating the job, the operation passes the work identifier (BigQuery job ID) to an Airflow trigger.
The trigger monitors the job until it finishes. In this stage, a worker slot is not occupied. The Airflow triggerer has asynchronous architecture and is capable of handling hundreds of such jobs. When the trigger detects that the job is finished, it sends an event that triggers the last stage.
In the last stage, an Airflow worker executes a callback. This callback, for example, can mark the task as successful, or execute another operation and set the job to be monitored by the triggerer again.
The triggerer is stateless and therefore resilient to interruptions or restarts, . Because of this, long-running jobs are resilient to pod restarts, unless the restart happens during the last stage, which is expected to be short.
Before you begin
- Deferrable Operators and Sensors are available in Cloud Composer 2
environments and require the following:
- Cloud Composer 2.0.31 and later versions
- Airflow 2.2.5, 2.3.3, and later versions
Enable support for deferrable operators
An environment component called Airflow triggerer asynchronously monitors all deferred tasks in your environment. After a deferred operation from such a task is completed, triggerer passes the task to an Airflow worker.
You need at least one triggerer instance in your environment (or at least two in highly resilient environments) to use deferrable mode in your DAGs. You can configure the triggerers when you create an environment or adjust the number of triggerers and performance parameters for an existing environment.
Google Cloud operators that support deferrable mode
Only some Airflow operators have been extended to support the deferrable model.
The following list is a reference for the operators in the
airflow.providers.google.operators.cloud
package
that support the deferrable mode.
The column with the minimum required airflow.providers.google.operators.cloud
package version represents the earliest package version where that operator supports deferrable mode.
Cloud Composer operators
Operator name | Required apache-airflow-providers-google version |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
BigQuery operators
Operator name | Required apache-airflow-providers-google version |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
BigQuery Data Transfer Service operators
Operator name | Required apache-airflow-providers-google version |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
Cloud Build operators
Operator name | Required apache-airflow-providers-google version |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Cloud SQL operators
Operator name | Required apache-airflow-providers-google version |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Dataflow operators
Operator name | Required apache-airflow-providers-google version |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
Cloud Data Fusion operators
Operator name | Required apache-airflow-providers-google version |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataproc operators
Operator name | Required apache-airflow-providers-google version |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine operators
Operator name | Required apache-airflow-providers-google version |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
AI Platform operators
Operator name | Required apache-airflow-providers-google version |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
Use deferrable operators in your DAGs
A common convention for all Google Cloud operators is to enable the
deferrable mode with the deferrable
boolean parameter. If a Google Cloud
operator does not have this parameter, then it cannot run in the deferrable
mode. Other operators can have a different convention. For example, some
community operators have a separate class with the Async
suffix in the
name.
The following example DAG uses DataprocSubmitJobOperator
operator in the
deferrable mode:
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
View triggerer logs
The triggerer generates logs that are available together with logs of other environment components. For more information about viewing your environment logs, see View logs.
Monitor triggerer
For more information about monitoring the triggerer component, see Airflow metrics.
In addition to monitoring the triggerer, you can check the number of deferred tasks in the Unfinished Task metrics on the Monitoring dashboard of your environment.