Monitor environments with Cloud Monitoring

Cloud Composer 1 | Cloud Composer 2

You can use Cloud Monitoring and Cloud Logging with Cloud Composer.

Cloud Monitoring provides visibility into the performance, uptime, and overall health of cloud-powered applications. Cloud Monitoring collects and ingests metrics, events, and metadata from Cloud Composer to generate insights in dashboards and charts. You can use Cloud Monitoring to understand the performance and health of your Cloud Composer environments and Airflow metrics.

Logging captures logs produced by the scheduler and worker containers in your environment's cluster. These logs contain system-level and Airflow dependency information to help with debugging. For information about viewing logs, see View Airflow logs.

Before you begin

  • The following permissions are required to access logs and metrics for your Cloud Composer environment:

    • Read-only access to logs and metrics: logging.viewer and monitoring.viewer
    • Read-only access to logs, including private logs: logging.privateLogViewer
    • Read/write access to metrics: monitoring.editor

    For more information about other permissions and roles for Cloud Composer, see Access control.

  • To avoid duplicate logging, Cloud Logging for Google Kubernetes Engine is disabled.

  • Cloud Logging produces an entry for each status and event that occurs in your Google Cloud project. You can use exclusion filters to reduce the volume of logs, including the logs that Cloud Logging produces for Cloud Composer.

    Excluding logs from can cause health check failures and CrashLoopBackOff errors. You must include in exclusion filters to prevent it from being excluded.

  • Monitoring cannot plot the count values for DAGs and tasks that execute more than once per minute, and does not plot metrics for failed tasks.

Environment metrics

You can use environment metrics to check the resource usage and health of your Cloud Composer environments.

Environment health

To check the health of your environment, you can use the following health status metric:

Cloud Composer runs a liveness DAG named airflow_monitoring, which runs on a schedule and reports environment health as follows:

  • If the liveness DAG run finishes successfully, the health status is True.
  • If the liveness DAG run fails, the health status is False.

The liveness DAG is stored in the dags/ folder and visible in the Airflow UI. The frequency and contents of the liveness DAG are immutable and must not be modified. Changes to the liveness DAG do not persist.

Environment's dependencies checks

Cloud Composer periodically checks that the environment can reach the services required for its operation and that it has enough permissions to interact with them. Examples of services required for the environment's operation are Artifact Registry, Cloud Logging, and Cloud Monitoring.

The following metrics are available for the environment's dependencies checks:

Dependency metric API Description
Number of dependency checks This metric tracks the number of times reachability checks are performed on services required for the environment's operation.
Number of dependency permissions checks This metric tracks the number of times permission checks are performed on services required for the environment's operation.

Database health

To check the health of your database, you can use the following health status metric:

The Airflow monitoring pod pings the database every minute and reports health status as True if a SQL connection can be established or False if not.

Database metrics

The following environment metrics are available for the Airflow metadata database used by Cloud Composer environments. You can use these metrics to monitor the performance and resource usage of your environment's database instance.

For example, you might want to upgrade the Cloud SQL machine type of your environment if your environment approaches resource limits. Or you might want to optimize costs related to Airflow metadata database usage by doing a database cleanup, in order to keep storage under a certain threshold.

Database metric API Description
Database CPU usage
Database CPU cores
Database CPU utilization
Database Memory usage
Database Memory quota
Database Memory utilization
Database Disk usage
Database Disk quota
Database Disk utilization
Database Connections limit
Database Connections
Database available for failover Is True if the environment's Cloud SQL instance is in the high availability mode and is ready for failover.
Database automatic failover requests count Total number of auto-failover requests of the environment's Cloud SQL instance.

Scheduler metrics

Name API Description
Active schedulers Number of active scheduler instances.

Triggerer metrics

The following triggerer metrics are provided exclusively for Cloud Composer:

Name API Description
Active triggerers The number of active triggerer instances.

Additionally, the following Airflow metrics are available via Cloud Composer metrics:

Name API Name in Airflow Description
Total number of running triggers triggers.running The number of running triggers per triggerer instance.
Blocking triggers triggers.blocked_main_thread Number of triggers that blocked the main thread (likely because of not being fully asynchronous).
Failed triggers triggers.failed Number of triggers that failed with an error before they could fire an event.
Succeeded triggers triggers.succeeded Number of triggers that have fired at least one event.

Web server metrics

The following environment metrics are available for the Airflow web server used by Cloud Composer environments. You can use these metrics to check the performance and resource usage of your environment's Airflow web server instance.

For example, you might want to upgrade the web server machine type if it constantly approaches resource limits.

Name API Description
Web server CPU usage
Web server CPU quota
Web server memory usage
Web server memory quota
Active web servers Number of active web server instances.

DAG metrics

To help you monitor the efficiency of your DAG runs and identify tasks that cause high latency, the following DAG metrics are available.

DAG metric API
Number of DAG runs
Duration of each DAG run
Number of task runs
Duration of each task run

Cloud Monitoring shows only the metrics for completed workflow and task runs (success or failure). No Data displays when there is no workflow activity and for in-progress workflow and task runs.

Celery Executor metrics

The following Celery Executor metrics are available. These metrics can help you determine if there are sufficient worker resources in your environment.

Celery Executor metric API
Number of tasks in the queue
Number of online Celery workers

Airflow metrics

The following Airflow metrics are available. These metrics correspond to metrics provided by Airflow.

Name API Name in Airflow Description
Celery task non-zero exit codes celery.execute_command.failure Number of non-zero exit code from Celery tasks.
Celery task publish timeouts celery.task_timeout_error Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.
Serialized DAG fetch duration collect_db_dags Time taken for fetching all Serialized DAGs from the database.
DAG refresh errors dag.callback_exceptions Number of exceptions raised from DAG callbacks. When this happens, it means that a DAG callback is not working.
DAG refresh errors dag_file_refresh_error Number of failures when loading any DAG files.
DAG file load time dag_processing.last_duration.<dag_file> Time taken to load a specific DAG file.
Time since DAG file processing dag_processing.last_run.seconds_ago.<dag_file> Seconds since a DAG file was last processed.
DagFileProcessorManager stall count dag_processing.manager_stalls Number of stalled DagFileProcessorManager processes.
DAG parsing errors dag_processing.import_errors Number of errors generated when parsing DAG files.
Running DAG parsing processes dag_processing.processes Number of currently running DAG parsing processes.
Processor timeouts dag_processing.processor_timeouts Number of file processors that were killed because of taking too long.
Time taken to scan and import all DAG files dag_processing.total_parse_time Total time taken to scan and import all DAG files once.
Current DAG bag size dagbag_size Number of DAGs found when the scheduler ran a scan based on its configuration.
Failed SLA miss email notifications sla_email_notification_failure Number of failed SLA miss email notification attempts.
Open slots on executor executor.open_slots Number of open slots on the executor.
Queued tasks on executor executor.queued_tasks Number of queued tasks on the executor.
Running tasks on executor executor.running_tasks Number of running tasks on the executor.
Task instance successes/failures ti_failures, ti_successes Overall task instance successes/failures.
Started/finished jobs <job_name>_start, <job_name>_end Number of started/finished jobs, such as SchedulerJob, LocalTaskJob.
Job heartbeat failures <job_name>_heartbeat_failure Number of failed heartbeats for a job.
Tasks created per operator task_instance_created-<operator_name> Number of tasks instances created for a given operator.
Operator executions operator_failures_<operator_name>, operator_successes_<operator_name> Number of finished task instances per operator
Open slots in the pool pool.open_slots.<pool_name> Number of open slots in the pool.
Queued slots in the pool pool.queued_slots.<pool_name> Number of queued slots in the pool.
Running slots in the pool pool.running_slots.<pool_name> Number of running slots in the pool.
Starving tasks in the pool pool.starving_tasks.<pool_name> Number of starving tasks in the pool.
Time spent in scheduler's critical section scheduler.critical_section_duration Time spent in the critical section of scheduler loop. Only a single scheduler can enter this loop at a time.
Critical section lock failures scheduler.critical_section_busy Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.
Externally killed tasks scheduler.tasks.killed_externally Number of tasks killed externally.
Orphaned tasks scheduler.orphaned_tasks.cleared, scheduler.orphaned_tasks.adopted Number of orphaned tasks cleared/adopted by the scheduler.
Running/starving/executable tasks scheduler.tasks.running, scheduler.tasks.starving, scheduler.tasks.executable Number of running/starving/executable tasks.
Scheduler heartbeats scheduler_heartbeat Scheduler heartbeats.
Failed SLA callback notifications sla_callback_notification_failure Number of failed SLA miss callback notification attempts.
Smart sensor poking exception failures smart_sensor_operator.exception_failures Number of failures caused by exception in the previous smart sensor poking loop.
Smart sensor poking infrastructure failures smart_sensor_operator.infra_failures Number of infrastructure failures in the previous smart sensor poking loop.
Smart sensor poking exceptions smart_sensor_operator.poked_exception Number of exceptions in the previous smart sensor poking loop.
Smart sensor successfully poked tasks smart_sensor_operator.poked_success Number of newly succeeded tasks poked by the smart sensor in the previous poking loop.
Smart sensor poked tasks smart_sensor_operator.poked_tasks Number of tasks poked by the smart sensor in the previous poking loop.
Previously succeeded task instances previously_succeeded Number of previously succeeded task instances.
Killed zombie tasks zombies_killed Number of killed zombie tasks.
DAG run duration dagrun.duration.success.<dag_id>, dagrun.duration.failed.<dag_id> Time taken for a DagRun to reach success/failed state.
DAG dependency check duration dagrun.dependency-check.<dag_id> Time taken to check DAG dependencies. This metric is different from the environment's dependency and permission checks metrics and applies to DAGs
DAG run schedule delay dagrun.schedule_delay.<dag_id> Time of delay between the scheduled DagRun start date and the actual DagRun start date.
Finished tasks ti.finish.<dag_id>.<task_id>.<state> Number of completed tasks in a given DAG.
Task instance run duration dag.<dag_id>.<task_id>.duration Time taken to finish a task.
Started tasks ti.start.<dag_id>.<task_id> Number of started tasks in a given DAG.
Tasks removed from DAG task_removed_from_dag.<dag_id> Number of tasks removed for a given DAG (that is, task no longer exists in DAG).
Tasks restored to DAG task_restored_to_dag.<dag_id> Number of tasks restored for a given DAG (that is, task instance which was previously in REMOVED state in the DB is added to DAG file).
Task schedule delay dagrun.schedule_delay.<dag_id> Time elapsed between first task start_date and dagrun expected start.

Using Monitoring for Cloud Composer environments


You can use Metrics Explorer to display metrics related to your environments and DAGs:

  • Cloud Composer Environment resource contains metrics for environments.

    To show metrics for a specific environment, filter metrics by the environment_name label. You can also filter by other labels, such as environment's location or image version.

  • Cloud Composer Workflow resource contains metrics for DAGs.

    To show metrics for a specific DAG or task, filter metrics by the workflow_name and task_name labels. You can also filter by other labels, such as task status or Airflow operator name.

API and gcloud

You can create and manage custom dashboards and the widgets through the Cloud Monitoring API and gcloud monitoring dashboards command. For more information, see Manage dashboards by API.

For more information about resources, metrics, and filters, see the reference for Cloud Monitoring API:

Using Cloud Monitoring alerts

You can create alerting policies to monitor the values of metrics and to notify you when those metrics violate a condition.

  1. In the navigation panel of the Google Cloud console, select Monitoring, and then select  Alerting:

    Go to Alerting

  2. If you haven't created your notification channels and if you want to be notified, then click Edit Notification Channels and add your notification channels. Return to the Alerting page after you add your channels.
  3. From the Alerting page, select Create policy.
  4. To select the metric, expand the Select a metric menu and then do the following:
    1. To limit the menu to relevant entries, enter Cloud Composer into the filter bar. If there are no results after you filter the menu, then disable the Show only active resources & metrics toggle.
    2. For the Resource type, select Cloud Composer Environment or Cloud Composer Workflow.
    3. Select a Metric category and a Metric, and then select Apply.
  5. Click Next.
  6. The settings in the Configure alert trigger page determine when the alert is triggered. Select a condition type and, if necessary, specify a threshold. For more information, see Create metric-threshold alerting policies.
  7. Click Next.
  8. Optional: To add notifications to your alerting policy, click Notification channels. In the dialog, select one or more notification channels from the menu, and then click OK.
  9. Optional: Update the Incident autoclose duration. This field determines when Monitoring closes incidents in the absence of metric data.
  10. Optional: Click Documentation, and then add any information that you want included in a notification message.
  11. Click Alert name and enter a name for the alerting policy.
  12. Click Create Policy.
For more information, see Alerting policies.

What's next