Run an Apache Airflow DAG in Cloud Composer 2 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This quickstart guide shows you how to create a Cloud Composer environment and run an Apache Airflow DAG in Cloud Composer 2.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. To get the permissions that you need to complete this quickstart, ask your administrator to grant you the following IAM roles on your project:

    For more information about granting roles, see Manage access to projects, folders, and organizations.

    You might also be able to get the required permissions through custom roles or other predefined roles.

Create an environment's service account

When you create an environment, you specify a service account. This service account is called environment's service account. Your environment uses this service account to perform most of the operations.

The service account for your environment is not a user account. A service account is a special kind of account used by an application or a virtual machine (VM) instance, not a person.

To create a service account for your environment:

  1. Create a new service account, as described in the Identity and Access Management documentation.

  2. Grant a role to it, as described in the Identity and Access Management documentation. The required role is Composer Worker (composer.worker).

Create an environment

If this is the first environment in your project, then add Cloud Composer Service Agent account as a new principal on your environment's service account and grant the roles/composer.ServiceAgentV2Ext role to it.

By default, your environment uses the default Compute Engine service account, and the following example shows how to add this permission to it.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Create a new environment named example-environment in the us-central1 region, with the latest Cloud Composer 2 version.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.9.11-airflow-2.9.3

Create a DAG file

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.

This guide uses an example Airflow DAG defined in the quickstart.py file. Python code in this file does the following:

  1. Creates a DAG, composer_sample_dag. This DAG runs every day.
  2. Executes one task, print_dag_run_conf. The task prints the DAG run's configuration by using the bash operator.

Save a copy of the quickstart.py file on your local machine:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Upload the DAG file to your environment's bucket

Every Cloud Composer environment has a Cloud Storage bucket associated with it. Airflow in Cloud Composer schedules only DAGs that are located in the /dags folder in this bucket.

To schedule your DAG, upload quickstart.py from your local machine to your environment's /dags folder:

To upload quickstart.py with Google Cloud CLI, run the following command in the folder where the quickstart.py file is located:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

View the DAG

After you upload the DAG file, Airflow does the following:

  1. Parses the DAG file that you uploaded. It might take a few minutes for the DAG to become available to Airflow.
  2. Adds the DAG to the list of available DAGs.
  3. Executes the DAG according to the schedule you provided in the DAG file.

Check that your DAG is processed without errors and is available in Airflow by viewing it in DAG UI. DAG UI is Cloud Composer interface for viewing DAG information in Google Cloud console. Cloud Composer also provides access to Airflow UI, which is a native Airflow web interface.

  1. Wait about five minutes to give Airflow time to process the DAG file that you uploaded previously, and to complete the first DAG run (explained later).

  2. Run the following command in Google Cloud CLI. This command executes the dags list Airflow CLI command that lists DAGs in your environment.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Check that the composer_quickstart DAG is listed in the command's output.

    Example output:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

View DAG run details

A single execution of a DAG is called a DAG run. Airflow immediately executes a DAG run for the example DAG because the start date in the DAG file is set to yesterday. In this way, Airflow catches up to the specified DAG's schedule.

The example DAG contains one task, print_dag_run_conf, which runs the echo command in the console. This command outputs meta information about the DAG (DAG run's numeric identifier).

Run the following command in Google Cloud CLI. This command lists DAG runs for the composer_quickstart DAG:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Example output:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI does not provide a command to view task logs. You can use other methods to view Airflow task logs: Cloud Composer DAG UI, Airflow UI, or Cloud Logging. This guide shows a way to query Cloud Logging for logs from a specific DAG run.

Run the following command in Google Cloud CLI. This command reads logs from Cloud Logging for a specific DAG run of the composer_quickstart DAG. The --format argument formats the output so that only the text of the log message is displayed.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Replace:

  • RUN_ID with the run_id value from the output of the tasks states-for-dag-run command that you run previously. For example, 2024-02-17T15:38:38.969307+00:00.

Example output:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment:

    1. In the Google Cloud console, go to the Environments page.

      Go to Environments

    2. Select example-environment and click Delete.

    3. Wait until the environment is deleted.

  2. Delete your environment's bucket. Deleting the Cloud Composer environment does not delete its bucket.

    1. In the Google Cloud console, go to the Storage > Browser page.

      Go to Storage > Browser

    2. Select the environment's bucket and click Delete. For example, this bucket can be named us-central1-example-environ-c1616fe8-bucket.

  3. Delete the persistent disk of your environment's Redis queue. Deleting the Cloud Composer environment does not delete its persistent disk.

    1. In the Google Cloud console, go to the Compute Engine > Disks.

      Go to Disks

    2. Select the environment's Redis queue persistent disk and click Delete.

      For example, this disk can be named pvc-02bc4842-2312-4347-8519-d87bdcd31115. Disks for Cloud Composer 2 always have the Balanced persistent disk type and the size of 2 GB.

What's next