Run an Apache Airflow DAG in Cloud Composer 3

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This quickstart guide shows you how to create a Cloud Composer environment and run an Apache Airflow DAG in Cloud Composer 3.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. To get the permissions that you need to complete this quickstart, ask your administrator to grant you the following IAM roles on your project:

    For more information about granting roles, see Manage access to projects, folders, and organizations.

    You might also be able to get the required permissions through custom roles or other predefined roles.

Create an environment's service account

When you create an environment, you specify a service account. This service account is called environment's service account. Your environment uses this service account to perform most of the operations.

The service account for your environment is not a user account. A service account is a special kind of account used by an application or a virtual machine (VM) instance, not a person.

To create a service account for your environment:

  1. Create a new service account, as described in the Identity and Access Management documentation.

  2. Grant a role to it, as described in the Identity and Access Management documentation. The required role is Composer Worker (composer.worker).

Create an environment

  1. In the Google Cloud console, go to the Create environment page.

    Go to Create environment

  2. In the Name field, enter example-environment.

  3. In the Location drop-down list, select a region for the Cloud Composer environment. This guide uses us-central1 region.

  4. For other environment configuration options, use the provided defaults.

  5. Click Create and wait until the environment is created.

  6. When done, a green check mark is displayed next to the environment name.

Create a DAG file

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.

This guide uses an example Airflow DAG defined in the quickstart.py file. Python code in this file does the following:

  1. Creates a DAG, composer_sample_dag. This DAG runs every day.
  2. Executes one task, print_dag_run_conf. The task prints the DAG run's configuration by using the bash operator.

Save a copy of the quickstart.py file on your local machine:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Upload the DAG file to your environment's bucket

Every Cloud Composer environment has a Cloud Storage bucket associated with it. Airflow in Cloud Composer schedules only DAGs that are located in the /dags folder in this bucket.

To schedule your DAG, upload quickstart.py from your local machine to your environment's /dags folder:

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment, example-environment. The Environment details page opens.

  3. Click Open DAGs folder. The Bucket details page opens.

  4. Click Upload files and then select your copy of quickstart.py.

  5. To upload the file, click Open.

View the DAG

After you upload the DAG file, Airflow does the following:

  1. Parses the DAG file that you uploaded. It might take a few minutes for the DAG to become available to Airflow.
  2. Adds the DAG to the list of available DAGs.
  3. Executes the DAG according to the schedule you provided in the DAG file.

Check that your DAG is processed without errors and is available in Airflow by viewing it in DAG UI. DAG UI is Cloud Composer interface for viewing DAG information in Google Cloud console. Cloud Composer also provides access to Airflow UI, which is a native Airflow web interface.

  1. Wait about five minutes to give Airflow time to process the DAG file that you uploaded previously, and to complete the first DAG run (explained later).

  2. In the Google Cloud console, go to the Environments page.

    Go to Environments

  3. In the list of environments, click the name of your environment, example-environment. The Environment details page opens.

  4. Go to the DAGs tab.

  5. Check that the composer_quickstart DAG is present in the list of DAGs.

    The list of DAGs displays the composer_quickstart DAG with
    additional information such as state and schedule
    Figure 1. The list of DAGs displays the composer_quickstart DAG (click to enlarge)

View DAG run details

A single execution of a DAG is called a DAG run. Airflow immediately executes a DAG run for the example DAG because the start date in the DAG file is set to yesterday. In this way, Airflow catches up to the specified DAG's schedule.

The example DAG contains one task, print_dag_run_conf, which runs the echo command in the console. This command outputs meta information about the DAG (DAG run's numeric identifier).

  1. On the DAGs tab, click composer_quickstart. The Runs tab for the DAG opens.

  2. In the list of DAG runs, click the first entry.

    The list of DAG runs shows the recent DAG run (its execution date
    and status)
    Figure 2. The list of DAG runs for the composer_quickstart DAG (click to enlarge)
  3. DAG run details are displayed, detailing the information about individual tasks of the example DAG.

    The list of tasks with an entry print_dag_run_conf, its start
    time, end time, and duration
    Figure 3. The list of tasks that were executed in the DAG run (click to enlarge)
  4. The Logs for DAG run section lists logs for all tasks in the DAG run. You can see the output of the echo command in the logs.

    Log entries of the task, one of them is Output and the other lists
    an identifier
    Figure 4. Logs of the print_dag_run_conf task (click to enlarge)

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

Delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment:

    1. In the Google Cloud console, go to the Environments page.

      Go to Environments

    2. Select example-environment and click Delete.

    3. Wait until the environment is deleted.

  2. Delete your environment's bucket. Deleting the Cloud Composer environment does not delete its bucket.

    1. In the Google Cloud console, go to the Storage > Browser page.

      Go to Storage > Browser

    2. Select the environment's bucket and click Delete. For example, this bucket can be named us-central1-example-environ-c1616fe8-bucket.

  3. Delete the persistent disk of your environment's Redis queue. Deleting the Cloud Composer environment does not delete its persistent disk.

    1. In the Google Cloud console, go to the Compute Engine > Disks.

      Go to Disks

    2. Select the environment's Redis queue persistent disk and click Delete.

      For example, this disk can be named gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disks for Cloud Composer 1 always have the Standard persistent disk type and the size of 2 GB.

What's next