Troubleshooting Airflow triggerer issues

Cloud Composer 1 | Cloud Composer 2

This page provides troubleshooting steps and information for common issues with the Airflow triggerer.

Blocking operations in trigger

Asynchronous tasks might occasionally become blocked in triggerers. In most cases, the problems come from insufficient triggerer resources or issues with custom asynchronous operator code.

Triggerer logs surface any warning messages that can help you identify root causes of decreased triggerer performance. There are two significant warnings to look for.

  1. Async thread blocked

    Triggerer's async thread was blocked for 1.2 seconds, likely due to the highly utilized environment.
    

    This warning signals issues with performance due to a high volume of async tasks.

    Solution: To address this issue, allocate more resources to the triggerers, reduce the number of deferred tasks that are executed at the same time, or increase the number of triggerers in your environment. Keep in mind that even though triggerers handle deferrable tasks, it's the workers that are responsible for starting and eventually completing each task. If you are adjusting the number of triggerers, consider also scaling the number of your worker instances.

  2. A specific task blocked the async thread.

    WARNING - Executing <Task finished coro=<TriggerRunner.run_trigger() done, defined at /opt/***/***/jobs/my-custom-code.py:609> result=None> took 0.401 second
    

    This warning points to a specific piece of operator code executed by Cloud Composer. Triggers by design should rely on the asyncio library for running operations in the background. A custom implementation of a trigger can fail to properly adhere to asyncio contracts (for example because of incorrect usage of await and async keywords in Python code).

    Solution: Inspect the code reported by the warning and check if the async operation is properly implemented.

Too many triggers

The number of deferred tasks is visible in the task_count metric which is also displayed on the Monitoring dashboard of your environment. Each trigger creates some resources such as connections to external resources, which consume memory.

Deferred tasks displayed on the Monitoring dashboard
Figure 1. Deferred tasks displayed on the Monitoring dashboard (click to enlarge)

Graphs of memory and CPU consumption indicate that insufficient resources cause restarts because the liveness probe fails because of missing heartbeats:

Triggerer restarts because of insufficient resources
Figure 2. Triggerer restarts because of insufficient resources (click to enlarge)

Solution: To address this issue, allocate more resources to the triggerers, reduce the number of deferred tasks that are executed at the same time, or increase the number of triggerers in your environment.

Crash of an Airflow worker during the callback execution

After the trigger finishes the execution, the control returns to an Airflow worker, which runs a callback method using an execution slot. This phase is controlled by Celery Executor and therefore the corresponding configuration and resource limits apply (such as parallelism or worker_concurrency).

If the callback method fails in the Airflow worker, the worker fails, or the worker that runs the method restarts, then the task is marked as FAILED. In this case, the retry operation re-executes the entire task, not only the callback method.

Infinite loop in a trigger

It is possible to implement a custom trigger operator in such a way that it entirely blocks the main triggerer loop, so that only the one broken trigger is executed at the time. In this case, a warning is generated in the triggerer logs after the problematic trigger is finished.

Trigger class not found

Because the DAGs folder is not synchronized with the Airflow triggerer, the inlined trigger code is missing when the trigger is executed. The error is generated in the logs of the failed task:

ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/
class

Solution: Import the missing code from PyPI.

Warning message about the triggerer in Airflow UI

In some cases after the triggerer is disabled, you might see the following warning message in Airflow UI:

The triggerer does not appear to be running. Last heartbeat was received
4 hours ago. Triggers will not run, and any deferred operator will remain
deferred until it times out or fails.

Airflow can show this message because incomplete triggers remain in the Airflow database. This message usually means that the triggerer was disabled before all triggers were completed in your environment.

You can view all triggers that are running in the environment by checking the Browse > Triggers page in Airflow UI (the Admin role is required).

Solutions:

Tasks remain in the deferred state after the triggerer is disabled

When the triggerer is disabled, tasks that are already in the deferred state remain in this state until the timeout is reached. This timeout can be infinite, depending on the Airflow and DAG configuration.

Use one of the following solutions:

  • Manually mark the tasks as failed.
  • Enable the triggerer to complete the tasks.

We recommend to disable the triggerer only if your environment does not run any deferred operators or tasks, and all deferred tasks are completed.

What's next