Troubleshooting Airflow Scheduler issues

This page provides troubleshooting steps and information for common issues with the Airflow Scheduler.

Identify the source of the issue

To begin troubleshooting, identify if the issue happens at DAG parse time or while processing tasks at execution time. For more information about parse time and execution time, read Difference between DAG parse time and DAG execution time.

Inspecting DAG parse times

To verify if the issue happens at DAG parse time, follow these steps.

Console

  1. In the Cloud Console, go to the Environments page.

    Open the Environments page

  2. Select your environment from the list.
  3. In the Monitoring tab, review the Total parse time for all DAG files chart in the DAG runs section and identify possible issues.

    The DAG runs section in the Composer Monitoring tab shows health metrics
for the DAGs in your environment

gcloud

Use the list_dags command with the -r flag to see the parse time for all your DAGs.

gcloud composer environments run ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -r

Replace the following:

  • ENVIRONMENT_NAME: The name of your environment
  • ENVIRONMENT_LOCATION: The region of your environment

The output of the command looks similar to the following:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Look for the DagBag parsing time value. A large value might indicate that one of your DAGs is not implemented in an optimal way. From the output table, you can identify which DAGs have a long parsing time.

Monitoring running and queued tasks

To check if you have tasks stuck in a queue, follow these steps.

  1. In the Cloud Console, go to the Environments page.

    Open the Environments page

  2. Select the project from the list.
  3. In the Monitoring tab, review the Running and queued tasks chart in the DAG runs section and identify possible issues.

Troubleshooting issues at DAG parse time

The following sections describe symptoms and potential fixes for some common issues at DAG parse time.

Limited number of threads

Allowing the DAG processor manager (the part of the Scheduler that processes DAG files) to use only a limited number of threads might impact your DAG parse time. To solve the issue, apply the following changes to the airflow.cfg configuration file:

  • For Airflow 1.10.12 and earlier versions, use the max_threads parameter:

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • For Airflow 1.10.14 and later versions, use the parsing_processes parameter:

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

Replace NUMBER_OF_CORES_IN_MACHINE with the number of cores in the worker nodes machines.

Number and time distribution of tasks

Airflow is known for having problems with scheduling a large number of small tasks. In such situations, you should opt for a smaller number of more consolidated tasks.

Scheduling a large number of DAGs or tasks at the same time might also be a possible source of issues. To avoid this problem, distribute your tasks more evenly over time.

Troubleshooting issues with running and queued tasks

The following sections describe symptoms and potential fixes for some common issues with running and queued tasks.

Task queues are too long

In some cases, a task queue might be too long for the Scheduler. For information about how to optimize worker and celery parameters, read about scaling your Cloud Composer environment together with your business.

Limited cluster resources

You might experience performance issues if the GKE cluster of your environment is too small to handle all your DAGs and tasks. In this case, try one of the following solutions:

  • Create a new environment with a machine type that provides more performance and migrate your DAGs to it.
  • Create more Cloud Composer environments and split the DAGs between them.
  • Change the machine type for GKE nodes, as described in Upgrading the machine type for GKE nodes. Since this procedure is error-prone, it is the least recommended option.
  • Upgrade the machine type of the Cloud SQL instance that runs the Airflow database in your environment, for example using the gcloud composer environments update commands. Low performance of the Airflow database might be the reason why the Scheduler is slow.

Avoid task scheduling during maintenance windows

You can define specific maintenance windows for your environment. During these time periods, maintenance events for Cloud SQL and GKE take place.

Avoid scheduling DAG runs during maintenance windows because this might cause scheduling or execution issues.

Make the Airflow scheduler ignore unnecessary files

You can improve performance of the Airflow scheduler by skipping unnecessary files in the DAGs folder. Airflow scheduler ignores files and folders specified in the .airflowignore file.

To make the Airflow scheduler ignore unnecessary files:

  1. Create an .airflowignore file.
  2. In this file, list files and folders that should be ignored.
  3. Upload this file to the /dags folder in your environment's bucket.

For more information about the .airflowignore file format, see Airflow documentation

Airflow scheduler processes paused DAGs

Airflow users pause DAGs to avoid their execution. This saves Airflow workers processing cycles.

Airflow scheduler will continue parsing paused DAGs. If you really want to improve Airflow schduler performance, use .airflowignore or delete paused DAGs from DAGs folder.

Usage of 'wait_for_downstream' in your DAGs

If you set the wait_for_downstream parameter to True in your DAGs, then for a task to succeed, all tasks that are immediately downstream of this task must also succeed. It means that execution of tasks belonging to a certain DAG run might be slowed down by execution of tasks from the previous DAG run. Read more about it in the Airflow documentation

Scaling Airflow configuration

Airflow provides Airflow configuration options that control how many tasks and DAGs Airflow can execute at the same time. To set these configuration options, override their values for your environment.

  • Worker Concurrency

    The [core]worker_concurrency parameter controls the maximum number of tasks that an Airflow worker can execute at the same time. If you multiply the value of this parameter by the number of Airflow workers in your Cloud Composer environment, then you get the maximum number of tasks that can be executed in a given moment in your environment. This number is limited by the [core]parallelism Airflow configuration option, which is described further.

  • Max Active DAG Runs

    The [core]max_active_runs_per_dag Airflow configuration option controls the maximum number of active DAG runs per DAG. The scheduler does not create more DAG runs if it reaches this limit.

    If this parameter is set incorrectly, you might encounter a problem where the scheduler throttles DAG execution because it cannot create more DAG run instances in a given moment.

  • DAG Concurrency

    The [core]dag_concurrency Airflow configuration option controls the maximum number of task instances that can run concurrently in each DAG. It is a DAG-level parameter.

    If this parameter is set incorrectly then you might encounter a problem where the execution of a single DAG instance is slow because there is only a limited number of DAG tasks that can be executed at a given moment.

  • Parallelism and pool size

    The [core]parallelism Airflow configuration option controls how many tasks the Airflow scheduler can queue in the Executor's queue after all dependencies for these tasks are met.

    This is a global parameter for the whole Airflow setup.

    Tasks are queued and executed within a pool. Cloud Composer environments use only one pool. The size of this pool controls how many tasks can be queued by the scheduler for execution in a given moment. If the pool size is too small, then the scheduler cannot queue tasks for execution even though thresholds, which are defined by the [core]parallelism configuration option and by the [core]worker_concurrency configuration option multiplied by the number of Airflow workers, are not met yet.

    You can configure the pool size in the Airflow UI (Menu > Admin > Pools). Adjust the pool size to the level of parallelism you expect in your environment.