Troubleshooting DAGs

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This page provides troubleshooting steps and information for common workflow issues.

Many DAG execution issues are caused by non-optimal environment performance. You can optimize your Cloud Composer 2 environment by following the Optimize environment performance and costs guide.

Some DAG executions issues might be caused by the Airflow scheduler not working correctly or optimally. Please, follow Scheduler troubleshooting instructions to solve these issues.

Troubleshooting workflow

To begin troubleshooting:

  1. Check the Airflow logs.

    You can increase the logging level of Airflow by overriding the following Airflow configuration option.

    Airflow 2

    Section Key Value
    logging logging_level The default value is INFO. Set to DEBUG to get more verbosity in log messages.

    Airflow 1

    Section Key Value
    core logging_level The default value is INFO. Set to DEBUG to get more verbosity in log messages.
  2. Check the Monitoring Dashboard.

  3. Review the Cloud Monitoring.

  4. In Google Cloud console, check for errors on the pages for the components of your environment.

  5. In the Airflow web interface, check in the DAG's Graph View for failed task instances.

    Section Key Value
    webserver dag_orientation LR, TB, RL, or BT

Debugging operator failures

To debug an operator failure:

  1. Check for task-specific errors.
  2. Check the Airflow logs.
  3. Review the Cloud Monitoring.
  4. Check operator-specific logs.
  5. Fix the errors.
  6. Upload the DAG to the dags/ folder.
  7. In the Airflow web interface, clear the past states for the DAG.
  8. Resume or run the DAG.

Troubleshooting task execution

Airflow is a distributed system with many entities like scheduler, executor, workers that communicate to each other through a task queue and the Airflow database and send signals (like SIGTERM). The following diagram shows an overview of interconnections between Airflow components.

Interaction between Airflow components
Figure 1. Interaction between Airflow components (click to enlarge)

In a distributed system like Airflow there might be some network connectivity issues, or the underlying infrastructure might experience intermittent issues; this can lead to situations when tasks can fail and be rescheduled for execution, or tasks might not be successfully completed (for exampe, Zombie tasks, or tasks that got stuck in execution). Airflow has mechanisms to deal with such situations and automatically resume the normal functioning. Following sections explain common problems that occur during task execution by Airflow: Zombie tasks, Poison Pills and SIGTERM signals.

Troubleshooting Zombie tasks

Airflow detects two kinds of mismatch between a task and a process that executes the task:

  • Zombie tasks are tasks that are supposed to be running but are not running. This might happen if the task's process was terminated or is not responding, if the Airflow worker didn't report a task status in time because it is overloaded, or if VM where the task is executed was shut down. Airflow finds such tasks periodically, and either fails or retries the task, depending on the task's settings.

    Discover zombie tasks

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Undead tasks are tasks that are not supposed to be running. Airflow finds such tasks periodically and terminates them.

The most common reasons and solutions for Zombie tasks are listed below.

Airflow worker ran out of memory

Each Airflow worker could run up to [celery]worker_concurrency task instances simultaneously. If a cumulative memory consumption of those task instances exceeds the memory limit for an Airflow worker, a random process on it will be terminated to free up resources.

Sometimes, the shortage of memory on an Airflow worker can lead to malformed packets being sent during an SQL Alchemy session to the database, to a DNS server or to any other service called by a DAG. In this case the other end of the connection might reject or drop connections from the Airflow worker. For example:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Discover Airflow worker out-of-memory events

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Solutions:

Airflow worker was evicted

Pod evictions are a normal part of running workloads on Kubernetes. GKE evicts pods if they ran out of storage or to free up resources for workloads with a higher priority.

Discover Airflow worker evictions

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Solutions:

Airflow worker was terminated

Airflow workers might be removed externally. If currently running tasks don't finish during a graceful termination period, they will be interrupted and could end up being detected as zombies.

Discover Airflow worker pod terminations

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Possible scenarios and solutions:

  • Airflow workers are restarted during environment modifications, such as upgrades or package installation:

    Discover Composer environment modifications

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    You can perform such operations when no critical tasks are running or enable task retries.

  • Various components might be temporarily unavailable during maintenance operations:

    Discover GKE maintenance operations

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    You can specify maintenance windows to minimize overlaps with the critical tasks execution.

  • In Cloud Composer 2 versions earlier than 2.4.5, a terminating Airflow worker might ignore SIGTERM signal and continue to execute tasks:

    Discover scaling down by Composer autoscaling

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    You can upgrade to a later Cloud Composer version where this issue is fixed.

Airflow worker was under heavy load

The amount of CPU and memory resources available to an Airflow worker is limited by the environment's configuration. If an utilization gets closer to the limits, it would cause a resource contention and unnecessary delays during the task execution. In the extreme situations, when resources lack during longer periods of time, this could cause zombie tasks.

Solutions:

Airflow database was under heavy load

A database is used by various Airflow components to communicate to each other and, in particular, to store heartbeats of task instances. Resource shortage on the database will lead to longer query times and might affect a task execution.

Sometimes, the following errors are present in an Airflow worker's logs:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Solutions:

Airflow database was temporarily unavailable

An Airflow worker might take time to detect and gracefully handle intermittent errors, such as temporary connectivity issues. It might exceed the default zombie detection threshold.

Discover Airflow heartbeat timeouts

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Solutions:

  • Increase the timeout for zombie tasks and override the value of the [scheduler]scheduler_zombie_task_threshold Airflow configuration option:

    Section Key Value Notes
    scheduler scheduler_zombie_task_threshold New timeout (in seconds) The default value is 300

Troubleshooting Poison Pill

Poison Pill is a mechanism used by Airflow to shut down Airflow tasks.

Airflow uses Poison Pill in these situations:

  • When a scheduler terminates a task that did not complete on time.
  • When a task times out or is executed for too long.

When Airflow uses Poison Pill, you can see the following log entries in the logs of an Airflow worker that executed the task:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possible solutions:

  • Check the task code for errors that might cause it to run for too long.
  • (Cloud Composer 2) Increase the CPU and memory for Airflow workers, so that tasks execute faster.
  • Increase the value of the [celery_broker_transport_options]visibility-timeout Airflow configuration option.

    As a result, the scheduler waits longer for a task to be finished, before considering the task to be a Zombie task. This option is especially useful for time-consuming tasks that last many hours. If the value is too low (for example, 3 hours), then the scheduler considers tasks that run for 5 or 6 hours as "hanged" (Zombie tasks).

  • Increase the value of the [core]killed_task_cleanup_time Airflow configuration option.

    A longer value provides more time to Airflow workers to finish their tasks gracefully. If the value is too low, Airflow tasks might be interrupted abruptly, without enough time to finish their work gracefully.

Troubleshooting SIGTERM signals

SIGTERM signals are used by Linux, Kubernetes, Airflow scheduler and Celery to terminate processes responsible for running Airflow workers or Airflow tasks.

There might be several reasons why SIGTERM signals are sent in an environment:

  • A task became a Zombie task and must be stopped.

  • The scheduler discovered a duplicate of a task and sends Poison Pill and SIGTERM signals to the task to stop it.

  • In Horizontal Pod Autoscaling, the GKE Control Plane sends SIGTERM signals to remove Pods that are no longer needed.

  • The scheduler can send SIGTERM signals to DagFileProcessorManager process. Such SIGTERM signals are used by the Scheduler to manage DagFileProcessorManager process lifecycle and can be safely ignored.

    Example:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Race condition between the heartbeat callback and exit callbacks in the local_task_job, which monitors the execution of the task. If the heartbeat detects that a task was marked as success, it cannot distinguish whether the task itself succeeded or that Airflow was told to consider the task successful. Nonetheless, it will terminate a task runner, without waiting for it to exit.

    Such SIGTERM signals can be safely ignored. The task is already in the successful state and the execution of the DAG run as a whole will not be affected.

    The log entry Received SIGTERM. is the only difference between the regular exit and the termination of task in the successful state.

    Race condition between the heartbeat and exit callbacks
    Figure 2. Race condition between the heartbeat and exit callbacks (click to enlarge)
  • An Airflow component uses more resources (CPU, memory) than permitted by the cluster node.

  • GKE service performs maintenance operations and sends SIGTERM signals to Pods that run on a node that is about to be upgraded. When a task instance is terminated with SIGTERM, you can see the following log entries in the logs of an Airflow worker that executed the task:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Possible solutions:

This issue happens when a VM that runs the task is out of memory. This is not related to Airflow configurations but to the amount of memory available to the VM.

Increasing the memory is dependent on the Cloud Composer version that you use. For example:

  • In Cloud Composer 2, you can assign more CPU and memory resources to Airflow workers.

  • In case of Cloud Composer 1, you can re-create your environment using a machine type with more performance.

  • In both versions of Cloud Composer, you can lower the value of the [celery]worker_concurrency concurrency Airflow configuration option. This option determines how many tasks are executed concurrently by a given Airflow worker.

For more information about optimizing your Cloud Composer 2 environment, see Optimize environment performance and costs

Cloud Logging queries to discover reasons for Pod restarts or evictions

Cloud Composer's environments use GKE clusters as compute infrastructure layer. In this section you will be able to find useful queries that can help to find reasons for Airflow worker or Airflow scheduler restarts or evictions.

Queries presented below could be tuned in the following way:

  • you can specify timeline interesting for you in Cloud Logging; for example, the last 6 hours, 3 days, or you can define your custom time range

  • you should specify the Cloud Composer's CLUSTER_NAME

  • you can also limit the search to a specific Pod by adding the POD_NAME

Discover restarted containers

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Discover containers shutdown as a result of Out-of-Memory event

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Discover containers that stopped executing

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impact of update or upgrade operations on Airflow task executions

Update or upgrade operations interrupt currently executing Airflow tasks, unless a task is executed in the deferrable mode.

We recommend to perform these operations when you expect minimal impact on Airflow task executions and set up appropriate retry mechanisms in your DAGs and tasks.

Troubleshooting KubernetesExecutor tasks

CeleryKubernetesExecutor is a type of executor in Cloud Composer 3 that can use CeleryExecutor and KubernetesExecutor at the same time.

See the Use CeleryKubernetesExecutor page for more information about troubleshooting tasks executed with KubernetesExecutor.

Common issues

The following sections describe symptoms and potential fixes for some common DAG issues.

Airflow task was interrupted by Negsignal.SIGKILL

Sometimes your task might be using more memory than Airflow worker is allocated. In such a situation it might be interrupted by Negsignal.SIGKILL. The system sends this signal to avoid further memory consumption which might impact the execution of other Airflow tasks. In the Airflow worker's log you might see the following log entry:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL might also appear as code -9.

Possible solutions:

  • Lower worker_concurrency of Airflow workers.

  • In the case of Cloud Composer 2, increase memory of Airflow workers.

  • In the case of Cloud Composer 1, upgrade to bigger machine type used in Cloud Composer cluster.

  • Optimize your tasks to use less memory.

  • Manage resource-intensive tasks in Cloud Composer by using KubernetesPodOperator or GKEStartPodOperator for task isolation and customized resource allocation.

Task fails without emitting logs due DAG parsing errors

Sometimes there might be subtle DAG errors that lead to a situation where an Airflow scheduler and DAG processor are able to schedule tasks for execution and to parse a DAG file (respectively) but Airflow worker fails to execute tasks from such a DAG as there are programming errors in python DAG file. This might lead to a situation where an Airflow task is marked as Failed and there is no log from its execution.

Solutions:

  • Verify in Airflow worker logs that there are no errors raised by Airflow worker related to missing DAG or DAG parsing errors.

  • Increase parameters related to DAG parsing:

  • See also Inspecting DAG Processor logs.

Task fails without emitting logs due to resource pressure

Symptom: during execution of a task, Airflow worker's subprocess responsible for Airflow task execution is interrupted abruptly. The error visible in Airflow worker's log might look similar to the one below:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solution:

Task fails without emitting logs due to Pod eviction

Google Kubernetes Engine pods are subject to the Kubernetes Pod Lifecycle and pod eviction. Task spikes and co-scheduling of workers are two most common causes for pod eviction in Cloud Composer.

Pod eviction can occur when a particular pod overuses resources of a node, relative to the configured resource consumption expectations for the node. For example, eviction might happen when several memory-heavy tasks run in a pod, and their combined load causes the node where this pod runs to exceed the memory consumption limit.

If an Airflow worker pod is evicted, all task instances running on that pod are interrupted, and later marked as failed by Airflow.

Logs are buffered. If a worker pod is evicted before the buffer flushes, logs are not emitted. Task failure without logs is an indication that the Airflow workers are restarted due to out-of-memory (OOM). Some logs might be present in Cloud Logging even though the Airflow logs were not emitted.

To view logs:

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Go to the Logs tab.

  4. View logs of individual workers under All logs -> Airflow logs -> Workers -> (individual worker).

DAG execution is memory-limited. Each task execution starts with two Airflow processes: task execution and monitoring. Each node can take up to 6 concurrent tasks (approximately 12 processes loaded with Airflow modules). More memory can be consumed, depending on the nature of the DAG.

Symptom:

  1. In Google Cloud console, go to the Workloads page.

    Go to Workloads

  2. If there are airflow-worker pods that show Evicted, click each evicted pod and look for the The node was low on resource: memory message at the top of the window.

Fix:

  • In Cloud Composer 1, create a new Cloud Composer environment with a larger machine type than the current machine type.
  • In Cloud Composer 2, increase memory limits for Airflow workers.
  • Check logs from airflow-worker pods for possible eviction causes. For more information about fetching logs from individual pods, see Troubleshooting issues with deployed workloads.
  • Make sure that the tasks in the DAG are idempotent and retriable.
  • Avoid downloading unnecessary files to the local file system of Airflow workers.

    Airflow workers have limited local file system capacity. For example, in Cloud Composer 2, a worker can have from 1 GB to 10 GB of storage. When the storage space runs out, the Airflow worker pod is evicted by GKE Control Plane. This fails all tasks that the evicted worker was executing.

    Examples, of problematic operations:

    • Downloading files or objects and storing them locally in an Airflow worker. Instead, store these objects directly in a suitable service such as a Cloud Storage bucket.
    • Accessing big objects in the /data folder from an Airflow worker. The Airflow worker downloads the object into its local filesystem. Instead, implement your DAGs so that big files are processed outside of the Airflow worker Pod.

DAG load import timeout

Symptom:

  • In the Airflow web interface, at the top of the DAGs list page, a red alert box shows Broken DAG: [/path/to/dagfile] Timeout.
  • In Cloud Monitoring: The airflow-scheduler logs contain entries similar to:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Fix:

Override the dag_file_processor_timeout Airflow configuration option and allow more time for DAG parsing:

Section Key Value
core dag_file_processor_timeout New timeout value

DAG execution does not end within expected time

Symptom:

Sometimes a DAG run does not end because Airflow tasks get stuck and DAG run lasts longer than expected. Under normal conditions, Airflow tasks do not stay indefinitely in the queued or running state, because Airflow has timeout and cleanup procedures that help avoid this situation.

Fix:

  • Use the dagrun_timeout parameter for the DAGs. For example: dagrun_timeout=timedelta(minutes=120). As a result, each DAG run must be finished within the DAG run timeout and not finished tasks are be marked as Failed or Upstream Failed. For more information about Airflow task states, see Apache Airflow documentation.

  • Use the task execution timeout parameter to define a default timeout for tasks that run based on Apache Airflow operators.

DAG runs not executed

Symptom:

When a schedule date for a DAG is set dynamically, this can lead to various unexpected side effects. For example:

  • A DAG execution is always in the future, and the DAG is never executed.

  • Past DAG runs are marked as executed and successful despite not being executed.

More information is available in Apache Airflow documentation.

Fix:

  • Follow the recommendations in the Apache Airflow documentation.

  • Set static start_date for DAGs. As an option, you can use catchup=False to disable running the DAG for past dates.

  • Avoid using datetime.now() or days_ago(<number of days>) unless you are aware of the side effects of this approach.

Increased network traffic to and from the Airflow database

The amount of traffic network between your environment's GKE cluster and the Airflow database depends on the number of DAGs, number of tasks in DAGs, and the way DAGs access data in the Airflow database. The following factors might influence the network usage:

  • Queries to the Airflow database. If your DAGs do a lot of queries, they generate large amounts of traffic. Examples: checking the status of tasks before proceeding with other tasks, querying the XCom table, dumping Airflow database content.

  • Large number of tasks. The more tasks are there to schedule, the more network traffic is generated. This consideration applies both to the total number of tasks in your DAGs and to the scheduling frequency. When the Airflow scheduler schedules DAG runs, it makes queries to the Airflow database and generates traffic.

  • Airflow web interface generates network traffic because it makes queries to the Airflow database. Intensively using pages with graphs, tasks, and diagrams can generate large volumes of network traffic.

DAG crashes the Airflow web server or causes it to return a 502 gateway timeout error

Web server failures can occur for several different reasons. Check the airflow-webserver logs in Cloud Logging to determine the cause of the 502 gateway timeout error.

Heavyweight computation

This section applies only to Cloud Composer 1.

Avoid running heavyweight computation at DAG parse time.

Unlike the worker and scheduler nodes, whose machine types can be customized to have greater CPU and memory capacity, the web server uses a fixed machine type, which can lead to DAG parsing failures if the parse-time computation is too heavyweight.

Note that the web server has 2 vCPUs and 2 GB of memory. The default value for core-dagbag_import_timeout is 30 seconds. This timeout value defines the upper limit for how long Airflow spends loading a Python module in the dags/ folder.

Incorrect permissions

This section applies only to Cloud Composer 1.

The web server does not run under the same service account as the workers and scheduler. As such, the workers and scheduler might be able to access user-managed resources that the web server cannot access.

We recommend that you avoid accessing non-public resources during DAG parsing. Sometimes, this is unavoidable, and you will need to grant permissions to the web server's service account. The service account name is derived from your web server domain. For example, if the domain is example-tp.appspot.com, the service account is example-tp@appspot.gserviceaccount.com.

DAG errors

This section applies only to Cloud Composer 1.

The web server runs on App Engine and is separate from your environment's GKE cluster. The web server parses the DAG definition files, and a 502 gateway timeout can occur if there are errors in the DAG. Airflow works normally without a functional web server if the problematic DAG is not breaking any processes running in GKE. In this case, you can use gcloud composer environments run to retrieve details from your environment and as a workaround if the web server becomes unavailable.

In other cases, you can run DAG parsing in GKE and look for DAGs that throw fatal Python exceptions or that time out (default 30 seconds). To troubleshoot, connect to a remote shell in an Airflow worker container and test for syntax errors. For more information, see Testing DAGs.

Handling a large number of DAGs and plugins in dags and plugins folders

Contents of /dags and /plugins folders are synchronized from your environment's bucket to local file systems of Airflow workers and schedulers.

The more data stored in these folders, the longer it takes to perform the synchronization. To address such situations:

  • Limit the number of files in /dags and /plugins folders. Store only the minimum of required files.

  • If possible, increase the disk space available to Airflow schedulers and workers.

  • If possible, increase CPU and memory of Airflow schedulers and workers, so that the sync operation is performed faster.

  • In case of a very large number of DAGs, divide DAGs into batches, compress them into zip archives and deploy these archives into the /dags folder. This approach speeds up the DAGs syncing process. Airflow components uncompress zip archives before processing DAGs.

  • Generating DAGs in a programmatic might also be a method for limiting the number of DAG files stored in the /dags folder. See the section about Programmatic DAGs to avoid problems with scheduling and executing DAGs generated programmatically.

Do not schedule programmatically generated DAGs at the same time

Generating DAG objects programmatically from a DAG file is an efficient method to author many similar DAGs that only have small differences.

It's important to not schedule all such DAGs for execution immediately. There is a high chance that Airflow workers do not have enough CPU and memory resources to execute all tasks that scheduled at the same time.

To avoid issues with scheduling programmatic DAGs:

  • Increase worker concurrency and scale up your environment, so that it can execute more tasks simultaneously.
  • Generate DAGs in a way to distribute their schedules evenly over time, to avoid scheduling hundreds of tasks at the same time, so that Airflow workers have time to execute all scheduled tasks.

Error 504 when accessing the Airflow web server

See Error 504 when accessing the Airflow UI.

Lost connection to Postgres / MySQL server during query exception is thrown during the task execution or right after it

Lost connection to Postgres / MySQL server during query exceptions often happen when the following conditions are met:

  • Your DAG uses PythonOperator or a custom operator.
  • Your DAG makes queries to the Airflow database.

If several queries are made from a callable function, tracebacks might incorrectly point to self.refresh_from_db(lock_for_update=True) line in the Airflow code; it is the first database query after the task execution. The actual cause of the exception happens before this, when an SQLAlchemy session is not properly closed.

SQLAlchemy sessions are scoped to a thread and created in a callable function session can be later continued inside the Airflow code. If there are significant delays between queries within one session, the connection might be already closed by the Postgres or MySQL server. The connection timeout in Cloud Composer environments is set to approximately 10 minutes.

Fix:

  • Use the airflow.utils.db.provide_session decorator. This decorator provides a valid session to the Airflow database in the session parameter and correctly closes the session at the end of the function.
  • Do not use a single long-running function. Instead, move all database queries to separate functions, so that there are multiple functions with the airflow.utils.db.provide_session decorator. In this case, sessions are automatically closed after retrieving query results.

Controlling execution time of DAGs, tasks and parallel executions of the same DAG

If you want to control how long a single DAG execution for a particular DAG lasts, then you can use the dagrun_timeout DAG parameter to do so. For example, if you expect that a single DAG run (irrespective, whether execution finishes with success or failure) must not last longer than 1 hour, then set this parameter to 3600 seconds.

You can also control how long you allow for a single Airflow task to last. To do so, you can use execution_timeout.

If you want to control how many active DAG runs you want to have for a particular DAG then you can use the [core]max-active-runs-per-dag Airflow configuration option to do so.

If you want to have only a single instance of a DAG run in a give moment, set max-active-runs-per-dag parameter to 1.

Issues impacting DAGs and plugins syncing to schedulers, workers and web servers

Cloud Composer syncs the content of /dags and /plugins folders to scheduler(s) and workers. Certain objects in /dags and /plugins folders might prevent this synchronization to work correctly or at least slow it down.

  • /dags folder is synced to schedulers and workers. This folder is not synced to web servers in Cloud Composer 2 or if you turn on DAG Serialization in Cloud Composer 1.

  • /plugins folder is synced to schedulers, workers and web servers.

You might encounter the following issues:

  • You uploaded gzip-compressed files that use compression transcoding to /dags and /plugins folders. It usually happens if you use the --gzip-local-all flag in a gcloud storage cp command to upload data to the bucket.

    Solution: Delete the object that used compression transcoding and re-upload it to the bucket.

  • One of the objects is named '.'—such an object is not synced to schedulers and workers, and it might stop syncing at all.

    Solution: Rename the problematic object.

  • A folder and a DAG Python file have the same names, for example a.py. In this case, the DAG file is not properly synced to Airflow components.

    Solution: Remove the folder that has the same name as a DAG Python file.

  • One of the objects in /dags or /plugins folders contains a / symbol at the end of the object's name. Such objects can mislead syncing process because the / symbol means that an object is a folder, not a file.

    Solution: Remove the / symbol from the name of the problematic object.

  • Don't store unnecessary files in /dags and /plugins folders.

    Sometimes DAGs and plugins that you implement are accompanied with additional files, such as files storing tests for these components. These files are synced to workers and schedulers and impact the time needed to copy these files to schedulers, workers and web servers.

    Solution: Don't store any additional and unnecessary files in /dags and /plugins folders.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers

This problem happens because objects can have overlapping namespace in Cloud Storage, while at the same time schedulers and workers use traditional file systems. For example, it is possible to add both a folder and an object with the same name to an environment's bucket. When the bucket is synced to the environment's schedulers and workers, this error is generated, which can lead to task failures.

To fix this problem, make sure that there are no overlapping namespaces in the environment's bucket. For example, if both /dags/misc (a file) and /dags/misc/example_file.txt (another file) are in a bucket, an error is generated by the scheduler.

Transient interruptions when connecting to Airflow Metadata DB

Cloud Composer runs on top of distributed cloud infrastructure. It means that from time to time some transient issues may appear and they might interrupt execution of your Airflow tasks.

In such situations you might see the following error messages in Airflow workers' logs:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

or

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Such intermittent issues might be also caused by maintenance operations performed for your Cloud Composer environments.

Usually such errors are intermittent and if your Airflow tasks are idempotent and you have retries configured, you should be immune to them. You can also consider defining maintenance windows.

One additional reason for such errors might be the lack of resources in your environment's cluster. In such cases, you might scale up or optimize your environment as described in Scaling environments or Optimizing your environment instructions.

A DAG run is marked as successful but has no executed tasks

If a DAG run execution_date is earlier than the DAG's start_date then you might see DAG runs that don't have any task runs, but are still marked as successful.

A successful DAG run without executed tasks
Figure 3. A successful DAG run without executed tasks (click to enlarge)

Cause

This situation might happen in one of the following cases:

  • A mismatch is caused by the timezone difference between the DAG's execution_date and start_date. It might happen, for example, when using pendulum.parse(...) to set start_date.

  • The DAG's start_date is set to a dynamic value, for example airflow.utils.dates.days_ago(1)

Solution

  • Make sure that execution_date and start_date are using the same timezone.

  • Specify a static start_date and combine it with catchup=False to avoid running DAGs with past start dates.

A DAG is not visible in Airflow UI or DAG UI and the scheduler does not schedule it

The DAG processor parses each DAG before it can be scheduled by the scheduler and before a DAG becomes visible in the Airflow UI or DAG UI.

The following Airflow configuration options define timeouts for parsing DAGs:

If a DAG is not visible in the Airflow UI or DAG UI:

  • Check DAG processor logs if the DAG processor is able to correctly process your DAG. In case of problems, you might see the following log entries in the DAG processor or scheduler logs:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Check scheduler logs to see if the scheduler works correctly. In case of problems, you might see the following log entries in scheduler logs:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Solutions:

  • Fix all DAG parsing errors. The DAG processor parses multiple DAGs, and in rare cases parsing errors of one DAG can negatively impact the parsing of other DAGs.

  • If the parsing of your DAG takes more than the amount of seconds defined in [core]dagrun_import_timeout, then increase this timeout.

  • If the parsing of all your DAGs takes more than the amount of seconds defined in [core]dag_file_processor_timeout, then increase this timeout.

  • If your DAG takes a long time to parse, it can also mean that it is not implemented in an optimal way. For example, if it reads read many environment variables, or performs calls to external services or Airflow database. To the extent possible, avoid performing such operations in global sections of DAGs.

  • Increase CPU and memory resources for Scheduler so it can work faster.

  • Adjust the number of schedulers.

  • Increase the number of DAG processor processes so that parsing can be done faster. You can do so by increasing the value of [scheduler]parsing_process.

  • Lower the frequency of DAG parsing.

  • Lower the load on the Airflow database.

Symptoms of Airflow database being under heavy load

For more information, see Symptoms of Airflow Database being under load pressure.

What's next