Troubleshooting DAGs

Cloud Composer 1 | Cloud Composer 2

This page provides troubleshooting steps and information for common workflow issues.

Many DAG execution issues are caused by non-optimal environment performance. You can optimize your Cloud Composer 2 environment by following the Optimize environment performance and costs guide.

Some DAG executions issues might be caused by the Airflow scheduler not working correctly or optimally. Please, follow Scheduler troubleshooting instructions to solve these issues.

Troubleshooting workflow

To begin troubleshooting:

  1. Check the Airflow logs.

    You can increase the logging level of Airflow by overriding the following Airflow configuration option.

    Airflow 2

    Section Key Value
    logging logging_level The default value is INFO. Set to DEBUG to get more verbosity in log messages.

    Airflow 1

    Section Key Value
    core logging_level The default value is INFO. Set to DEBUG to get more verbosity in log messages.
  2. Check the Monitoring Dashboard.

  3. Review the Cloud Monitoring.

  4. In console, check for errors on the pages for the components of your environment.

  5. In the Airflow web interface, check in the DAG's Graph View for failed task instances.

    Section Key Value
    webserver dag_orientation LR, TB, RL, or BT

Debugging operator failures

To debug an operator failure:

  1. Check for task-specific errors.
  2. Check the Airflow logs.
  3. Review the Cloud Monitoring.
  4. Check operator-specific logs.
  5. Fix the errors.
  6. Upload the DAG to the dags/ folder.
  7. In the Airflow web interface, clear the past states for the DAG.
  8. Resume or run the DAG.

Troubleshooting task execution

Airflow is a distributed system with many entities like scheduler, executor, workers that communicate to each other through a task queue and the Airflow database and send signals (like SIGTERM). The following diagram shows an overview of interconnections between Airflow components.

Interaction between Airflow components
Figure 1. Interaction between Airflow components (click to enlarge)

In a distributed system like Airflow there might be some network connectivity issues, or the underlying infrastructure might experience intermittent issues; this can lead to situations when tasks can fail and be rescheduled for execution, or tasks might not be successfully completed (for exampe, Zombie tasks, or tasks that got stuck in execution). Airflow has mechanisms to deal with such situations and automatically resume the normal functioning. Following sections explain common problems that occur during task execution by Airflow: Zombie tasks, Poison Pills and SIGTERM signals.

Troubleshooting Zombie tasks

Airflow detects two kinds of mismatch between a task and a process that executes the task:

  • Zombie tasks are tasks that are supposed to be running but are not running. This might happen if the task's process was terminated or is not responding, if the Airflow worker didn't report a task status in time because it is overloaded, or if VM where the task is executed was shut down. Airflow finds such tasks periodically, and either fails or retries the task, depending on the task's settings.

  • Undead tasks are tasks that are not supposed to be running. Airflow finds such tasks periodically and terminates them.

The most common reason for Zombie tasks is the shortage of CPU and memory resources in your environment's cluster. As a result, an Airflow worker might not be able to report the status of a task, or a sensor might be interrupted abruptly. In this case, the scheduler marks a given task as a Zombie task. To avoid Zombie tasks, assign more resources to your environment.

For more information about scaling your Cloud Composer environment, see Scaling environment guide. If you experience Zombie tasks, one possible solution is to increase the timeout for Zombie tasks. As a result, the scheduler waits longer before it considers a task as a Zombie. In this way, an Airflow worker has more time to report the status of the task.

To increase the timeout for Zombie tasks, override the value of the [scheduler]scheduler_zombie_task_threshold Airflow configuration option:

Section Key Value Notes
scheduler scheduler_zombie_task_threshold New timeout (in seconds) The default value is 300

Troubleshooting Poison Pill

Poison Pill is a mechanism used by Airflow to shut down Airflow tasks.

Airflow uses Poison Pill in these situations:

  • When a scheduler terminates a task that did not complete on time.
  • When a task times out or is executed for too long.

When Airflow uses Poison Pill, you can see the following log entries in the logs of an Airflow worker that executed the task:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possible solutions:

  • Check the task code for errors that might cause it to run for too long.
  • (Cloud Composer 2) Increase the CPU and memory for Airflow workers, so that tasks execute faster.
  • Increase the value of the [celery_broker_transport_options]visibility-timeout Airflow configuration option.

    As a result, the scheduler waits longer for a task to be finished, before considering the task to be a Zombie task. This option is especially useful for time-consuming tasks that last many hours. If the value is too low (for example, 3 hours), then the scheduler considers tasks that run for 5 or 6 hours as "hanged" (Zombie tasks).

  • Increase the value of the [core]killed_task_cleanup_time Airflow configuration option.

    A longer value provides more time to Airflow workers to finish their tasks gracefully. If the value is too low, Airflow tasks might be interrupted abruptly, without enough time to finish their work gracefully.

Troubleshooting SIGTERM signals

SIGTERM signals are used by Linux, Kubernetes, Airflow scheduler and Celery to terminate processes responsible for running Airflow workers or Airflow tasks.

There might be several reasons why SIGTERM signals are sent in an environment:

  • A task became a Zombie task and must be stopped.
  • The scheduler discovered a duplicate of a task and sends Poison Pill and SIGTERM signals to the task to stop it.
  • In Horizontal Pod Autoscaling, the GKE Control Plane sends SIGTERM signals to remove Pods that are no longer needed.
  • An Airflow component uses more resources (CPU, memory) than permitted by the cluster node.
  • GKE service performs maintenance operations and sends SIGTERM signals to Pods that run on a node that is about to be upgraded. When a task instance is terminated with SIGTERM, you can see the following log entries in the logs of an Airflow worker that executed the task:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Possible solutions:

This issue happens when a VM that runs the task is out of memory. This is not related to Airflow configurations but to the amount of memory available to the VM.

Increasing the memory is dependent on the Cloud Composer version that you use. For example:

  • In Cloud Composer 2, you can assign more CPU and memory resources to Airflow workers.

  • In case of Cloud Composer 1, you can re-create your environment using a machine type with more performance.

  • In both versions of Cloud Composer, you can lower the value of the [celery]worker_concurrency concurrency Airflow configuration option. This option determines how many tasks are executed concurrently by a given Airflow worker.

For more information about optimizing your Cloud Composer 2 environment, see Optimize environment performance and costs

Common issues

The following sections describe symptoms and potential fixes for some common DAG issues.

Task fails without emitting logs due DAG parsing errors

Sometimes there might be subtle DAG errors that lead to a situation where an Airflow scheduler and DAG processor are able to schedule tasks for execution and to parse a DAG file (respectively) but Airflow worker fails to execute tasks from such a DAG as there are programming errors in python DAG file. This might lead to a situation where an Airflow task is marked as Failed and there is no log from its execution.

Solution: verify in Airflow worker logs that there are no errors raised by Airflow worker related to missing DAG or DAG parsing errors.

Task fails without emitting logs due to resource pressure

Symptom: during execution of a task, Airflow worker's subprocess responsible for Airflow task execution is interrupted abruptly. The error visible in Airflow worker's log might look similar to the one below:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solution:

Task fails without emitting logs due to Pod eviction

Google Kubernetes Engine pods are subject to the Kubernetes Pod Lifecycle and pod eviction. Task spikes and co-scheduling of workers are two most common causes for pod eviction in Cloud Composer.

Pod eviction can occur when a particular pod overuses resources of a node, relative to the configured resource consumption expectations for the node. For example, eviction might happen when several memory-heavy tasks run in a pod, and their combined load causes the node where this pod runs to exceed the memory consumption limit.

If an Airflow worker pod is evicted, all task instances running on that pod are interrupted, and later marked as failed by Airflow.

Logs are buffered. If a worker pod is evicted before the buffer flushes, logs are not emitted. Task failure without logs is an indication that the Airflow workers are restarted due to out-of-memory (OOM). Some logs might be present in Cloud Logging even though the Airflow logs were not emitted.

To view logs:

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Go to the Logs tab.

  4. View logs of individual workers under All logs -> Airflow logs -> Workers -> (individual worker).

DAG execution is memory-limited. Each task execution starts with two Airflow processes: task execution and monitoring. Each node can take up to 6 concurrent tasks (approximately 12 processes loaded with Airflow modules). More memory can be consumed, depending on the nature of the DAG.

Symptom:

  1. In Google Cloud console, go to the Workloads page.

    Go to Workloads

  2. If there are airflow-worker pods that show Evicted, click each evicted pod and look for the The node was low on resource: memory message at the top of the window.

Fix:

  • In Cloud Composer 1, create a new Cloud Composer environment with a larger machine type than the current machine type.
  • In Cloud Composer 2, increase memory limits for Airflow workers.
  • Check logs from airflow-worker pods for possible eviction causes. For more information about fetching logs from individual pods, see Troubleshooting issues with deployed workloads.
  • Make sure that the tasks in the DAG are idempotent and retriable.
  • Avoid downloading unnecessary files to the local file system of Airflow workers.

    Airflow workers have limited local file system capacity. For example, in Cloud Composer 2, a worker can have from 1 GB to 10 GB of storage. When the storage space runs out, the Airflow worker pod is evicted by GKE Control Plane. This fails all tasks that the evicted worker was executing.

    Examples, of problematic operations:

    • Downloading files or objects and storing them locally in an Airflow worker. Instead, store these objects directly in a suitable service such as a Cloud Storage bucket.
    • Accessing big objects in the /data folder from an Airflow worker.
      The Airflow worker downloads the object into its local filesystem. Instead, implement your DAGs so that big files are processed outside of the Airflow worker Pod.

DAG load import timeout

Symptom:

  • In the Airflow web interface, at the top of the DAGs list page, a red alert box shows Broken DAG: [/path/to/dagfile] Timeout.
  • In Cloud Monitoring: The airflow-scheduler logs contain entries similar to:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Fix:

Override the dag_file_processor_timeout Airflow configuration option and allow more time for DAG parsing:

Section Key Value
core dag_file_processor_timeout New timeout value

Increased network traffic to and from the Airflow database

The amount of traffic network between your environment's GKE cluster and the Airflow database depends on the number of DAGs, number of tasks in DAGs, and the way DAGs access data in the Airflow database. The following factors might influence the network usage:

  • Queries to the Airflow database. If your DAGs do a lot of queries, they generate large amounts of traffic. Examples: checking the status of tasks before proceeding with other tasks, querying the XCom table, dumping Airflow database content.

  • Large number of tasks. The more tasks are there to schedule, the more network traffic is generated. This consideration applies both to the total number of tasks in your DAGs and to the scheduling frequency. When the Airflow scheduler schedules DAG runs, it makes queries to the Airflow database and generates traffic.

  • Airflow web interface generates network traffic because it makes queries to the Airflow database. Intensively using pages with graphs, tasks, and diagrams can generate large volumes of network traffic.

DAG crashes the Airflow web server or causes it to return a 502 gateway timeout error

Web server failures can occur for several different reasons. Check the airflow-webserver logs in Cloud Logging to determine the cause of the 502 gateway timeout error.

Heavyweight computation

This section applies only to Cloud Composer 1.

Avoid running heavyweight computation at DAG parse time.

Unlike the worker and scheduler nodes, whose machine types can be customized to have greater CPU and memory capacity, the web server uses a fixed machine type, which can lead to DAG parsing failures if the parse-time computation is too heavyweight.

Note that the web server has 2 vCPUs and 2 GB of memory. The default value for core-dagbag_import_timeout is 30 seconds. This timeout value defines the upper limit for how long Airflow spends loading a Python module in the dags/ folder.

Incorrect permissions

This section applies only to Cloud Composer 1.

The web server does not run under the same service account as the workers and scheduler. As such, the workers and scheduler might be able to access user-managed resources that the web server cannot access.

We recommend that you avoid accessing non-public resources during DAG parsing. Sometimes, this is unavoidable, and you will need to grant permissions to the web server's service account. The service account name is derived from your web server domain. For example, if the domain is example-tp.appspot.com, the service account is example-tp@appspot.gserviceaccount.com.

DAG errors

This section applies only to Cloud Composer 1.

The web server runs on App Engine and is separate from your environment's GKE cluster. The web server parses the DAG definition files, and a 502 gateway timeout can occur if there are errors in the DAG. Airflow works normally without a functional web server if the problematic DAG is not breaking any processes running in GKE. In this case, you can use gcloud composer environments run to retrieve details from your environment and as a workaround if the web server becomes unavailable.

In other cases, you can run DAG parsing in GKE and look for DAGs that throw fatal Python exceptions or that time out (default 30 seconds). To troubleshoot, connect to a remote shell in an Airflow worker container and test for syntax errors. For more information, see Testing DAGs.

Handling a large number of DAGs and plugins in dags and plugins folders

Contents of /dags and /plugins folders are synchronized from your environment's bucket to local file systems of Airflow workers and schedulers.

The more data stored in these folders, the longer it takes to perform the synchronization. To address such situations:

  • Limit the number of files in /dags and /plugins folders. Store only the
    minimum of required files.

  • If possible, increase the disk space available to Airflow schedulers and workers.

  • If possible, increase CPU and memory of Airflow schedulers and workers, so that the sync operation is performed faster.

  • In case of a very large number of DAGs, divide DAGs into batches, compress them into zip archives and deploy these archives into the /dags folder. This approach speeds up the DAGs syncing process. Airflow components uncompress zip archives before processing DAGs.

  • Generating DAGs in a programmatic might also be a method for limiting the number of DAG files stored in the /dags folder. See the section about Programmatic DAGs to avoid problems with scheduling and executing DAGs generated programmatically.

Do not schedule programmatically generated DAGs at the same time

Generating DAG objects programmatically from a DAG file is an efficient method to author many similar DAGs that only have small differences.

It's important to not schedule all such DAGs for execution immediately. There is a high chance that Airflow workers do not have enough CPU and memory resources to execute all tasks that scheduled at the same time.

To avoid issues with scheduling programmatic DAGs:

  • Increase worker concurrency and scale up your environment, so that it can execute more tasks simultaneously.
  • Generate DAGs in a way to distribute their schedules evenly over time, to avoid scheduling hundreds of tasks at the same time, so that Airflow workers have time to execute all scheduled tasks.

Error 504 when accessing the Airflow web server

See Error 504 when accessing the Airflow UI.

Lost connection to MySQL server during query exception is thrown during the task execution or right after it

Lost connection to MySQL / PostgreSQL server during query exceptions often happen when the following conditions are met:

  • Your DAG uses PythonOperator or a custom operator.
  • Your DAG makes queries to the Airflow database.

If several queries are made from a callable function, tracebacks might incorrectly point to self.refresh_from_db(lock_for_update=True) line in the Airflow code; it is the first database query after the task execution. The actual cause of the exception happens before this, when an SQLAlchemy session is not properly closed.

SQLAlchemy sessions are scoped to a thread and created in a callable function session can be later continued inside the Airflow code. If there are significant delays between queries within one session, the connection might be already closed by the MySQL or PostgreSQL server. The connection timeout in Cloud Composer environments is set to approximately 10 minutes.

Fix:

  • Use the airflow.utils.db.provide_session decorator. This decorator provides a valid session to the Airflow database in the session parameter and correctly closes the session at the end of the function.
  • Do not use a single long-running function. Instead, move all database queries to separate functions, so that there are multiple functions with the airflow.utils.db.provide_session decorator. In this case, sessions are automatically closed after retrieving query results.

Controlling execution time of DAGs, tasks and parallel executions of the same DAG

If you want to control how long a single DAG execution for a particular DAG lasts, then you can use the dagrun_timeout DAG parameter to do so. For example, if you expect that a single DAG run (irrespective, whether execution finishes with success or failure) must not last longer than 1 hour, then set this parameter to 3600 seconds.

You can also control how long you allow for a single Airflow task to last. To do so, you can use execution_timeout.

If you want to control how many active DAG runs you want to have for a particular DAG then you can use the [core]max-active-runs-per-dag Airflow configuration option to do so.

If you want to have only a single instance of a DAG run in a give moment, set max-active-runs-per-dag parameter to 1.

Issues impacting DAGs and plugins syncing to schedulers, workers and web servers

Cloud Composer syncs the content of /dags and /plugins folders to scheduler(s) and workers. Certain objects in /dags and /plugins folders might prevent this synchronization to work correctly or at least slow it down.

  • /dags folder is synced to schedulers and workers. This folder is not synced to web servers in Cloud Composer 2 or if you turn on DAG Serialization in Cloud Composer 1.

  • /plugins folder is synced to schedulers, workers and web servers.

You might encounter the following issues:

  • You uploaded gzip-compressed files that use compression transcoding to /dags and /plugins folders. It usually happens if you use the gsutil cp -Z command to upload data to the bucket.

    Solution: Delete the object that used compression transcoding and re-upload it to the bucket.

  • One of the objects is named '.'. Such an object is not synced to schedulers and workers and it might stop syncing at all.

    Solution: Rename a problematic object.

  • One of the objects in /dags or /plugins folders contains a / symbol at the end of the object's name. Such objects can mislead syncing process because the / symbol means that an object is a folder, not a file.

    Solution: Remove the / symbol from the name of the problematic object.

  • Don't store unnecessary files in /dags and /plugins folders.

    Sometimes DAGs and plugins that you implement are accompanied with additional files, such as files storing tests for these components. These files are synced to workers and schedulers and impact the time needed to copy these files to schedulers, workers and web servers.

    Solution: Don't store any additional and unnecessary files in /dags and /plugins folders.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers

This problem happens because objects can have overlapping namespace in Cloud Storage, while at the same time schedulers and workers use traditional file systems. For example, it is possible to add both a folder and an object with the same name to an environment's bucket. When the bucket is synced to the environment's schedulers and workers, this error is generated, which can lead to task failures.

To fix this problem, make sure that there are no overlapping namespaces in the environment's bucket. For example, if both /dags/misc (a file) and /dags/misc/example_file.txt (another file) are in a bucket, an error is generated by the scheduler.

Transient interruptions when connecting to Airflow Metadata DB

Cloud Composer runs on top of distributed cloud infrastructure. It means that from time to time some transient issues may appear and they might interrupt execution of your Airflow tasks.

In such situations you might see the following error messages in Airflow workers' logs:

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

or

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Such intermittent issues might be also caused by maintenance operations performed for your Cloud Composer environments.

Usually such errors are intermittent and if your Airflow tasks are idempotent and you have retries configured, you should be immune to them. You can also consider defining maintenance windows.

One additional reason for such errors might be the lack of resources in your environment's cluster. In such cases, you might scale up or optimize your environment as described in Scaling environments or Optimizing your environment instructions.

A DAG is not visible in Airflow UI or DAG UI and the scheduler does not schedule it

The DAG processor parses each DAG before it can be scheduled by the scheduler and before a DAG becomes visible in the Airflow UI or DAG UI. The [core]dag_file_processor_timeout Airflow configuration option defines how much time the DAG processor has to parse a single DAG.

If a DAG is not visible in the Airflow UI or DAG UI:

  • Check DAG processor logs if the DAG processor is able to correctly process your DAG. In case of problems, you might see the following log entries in the DAG processor or scheduler logs:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.

This error might be caused by one of the following reasons:

  • Your DAG is not correctly implemented and the DAG processor is not able to parse it. In this case, correct your DAG.

  • Parsing your DAG takes more than the amount of seconds defined in [core]dag_file_processor_timeout.

    In this case you can increase this timeout.

  • If your DAG takes a long time to parse, it can also mean that it is not implemented in an optimal way. For example, if it reads read many environment variables, or performs calls to external services. If this is the case, then optimize your DAG so the DAG processor can process it quickly.

Symptoms of Airflow database being under heavy load

Sometimes in the Airflow worker logs you might see the following warning log entries.

For MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

For PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Such an error or warning might indicate that the Airflow database is overwhelmed by the number of queries that it must handle.

Possible solutions:

What's next