Run an Apache Airflow DAG in Cloud Composer 1

Cloud Composer 1 | Cloud Composer 2

This page shows you how to create a Cloud Composer environment and run an Apache Airflow DAG in Cloud Composer.

If you are new to Airflow, see this tutorial for more information about Airflow concepts, objects, and their usage.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

Create an environment

Console

  1. In the Google Cloud console, go to the Create environment page.

    Go to Create environment

  2. In the Name field, enter example-environment.

  3. In the Location drop-down list, select a region for the Cloud Composer environment. See Available regions for information about selecting a region.

  4. For other environment configuration options, use the provided defaults.

  5. To create the environment, click Create.

  6. Wait until the environment is created. When done, a green check mark shows next to the environment name.

gcloud

Add Cloud Composer Service Agent account as a new principal on your environment's service account and grant the Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) role to it.

By default, your environment uses the default Compute Engine service account.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Create a new environment:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Replace:

  • ENVIRONMENT_NAME with the name of the environment. This quickstart uses example-environment.
  • LOCATION with a region for the Cloud Composer environment. See Available regions for information about selecting a region.
  • IMAGE_VERSION with the name of the Cloud Composer image. This guide uses composer-1.20.12-airflow-1.10.15 to create an environment with the latest Cloud Composer image.

Example:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

To configure this environment using Terraform, add the following resource block to your Terraform configuration and run terraform apply.

To utilize this resource block, the service account Terraform uses must have a role with the composer.environments.create permission enabled. For more information about the service account for Terraform, see Google Provider Configuration Reference.

For more information about using Terraform to create a Cloud Composer environment, refer to the Terraform documentation.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME with the name of the environment. This quickstart uses example-environment.

  • LOCATION with a region for the Cloud Composer environment. See Available regions for information about selecting a region.

  • IMAGE_VERSION with the name of the Cloud Composer image. This guide uses composer-1.20.12-airflow-1.10.15 to create an environment with the latest Cloud Composer image.

Example:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

View environment details

After the environment creation finishes, you can view your environment's information, such as the Cloud Composer version, the URL for the Airflow web interface, and the DAGs folder in Cloud Storage.

To view the environment information:

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. To view the Environment details page, click the name of your environment, example-environment.

Create a DAG

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.

The Python code in quickstart.py:

  1. Creates a DAG, composer_sample_dag. The DAG runs once per day.
  2. Executes one task, print_dag_run_conf. The task prints the DAG run's configuration by using the bash operator.

To create a DAG, create a copy of the quickstart.py file on your local machine.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Upload the DAG to Cloud Storage

Cloud Composer schedules only the DAGs that are located in the /dags folder in the environment's Cloud Storage bucket.

To schedule your DAG, upload quickstart.py from your local machine to your environment's /dags folder.

Console

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. To open the /dags folder, follow the DAGs folder link for example-environment.

  3. On the Bucket details page, click Upload files and then select your local copy of quickstart.py.

  4. To upload the file, click Open.

    After you upload your DAG, Cloud Composer adds the DAG to Airflow and schedules a DAG run immediately. It might take a few minutes for the DAG to show up in the Airflow web interface.

gcloud

To upload quickstart.py with gcloud, run the following command:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

View the DAG in the Airflow UI

Each Cloud Composer environment has a web server that runs the Airflow web interface. You can manage DAGs from the Airflow web interface.

To view the DAG in the Airflow web interface:

Airflow 1

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. To open the Airflow web interface, click the Airflow link for example-environment. The Airflow UI opens in a new browser window.

  3. In the Airflow toolbar, go to the DAGs page.

  4. To open the DAG details page, click composer_sample_dag.

    DAGs page in the Airflow UI
    Figure 1. DAGs page in the Airflow UI (click to enlarge)

    The page for the DAG shows the Tree View, a graphical representation of the workflow's tasks and dependencies.

    Tree view for the composer_sample_dags DAG
    Figure 2. Tree view for the composer_sample_dags DAG

Airflow 2

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. To open the Airflow web interface, click the Airflow link for example-environment. The Airflow UI opens in a new browser window.

  3. In the Airflow toolbar, go to the DAGs page.

  4. To open the DAG details page, click composer_sample_dag.

    DAGs page in the Airflow UI
    Figure 1. DAGs page in the Airflow UI (click to enlarge)

    The page for the DAG shows the Tree View, a graphical representation of the workflow's tasks and dependencies.

    Tree view for the composer_sample_dags DAG
    Figure 2. Tree view for the composer_sample_dags DAG

View task instance details in the Airflow logs

The DAG that you scheduled includes the print_dag_run_conf task. The task prints the DAG run's configuration, which you can see in the Airflow logs for the task instance.

To view the task instance details:

Airflow 1

  1. In the DAG's Tree View in the Airflow web interface, click Graph View.

    If you hold the pointer over the print_dag_run_conf task, its status displays.

    Tree view for the composer_sample_dags DAG
    Figure 3. Status of the print_dag_run_conf task
  2. Click the print_dag_run_conf task.

    In the Task Instance context menu, you can get metadata and perform some actions.

    Task Instance context menu for the composer_sample_dags task
    Figure 4. Task Instance context menu for the composer_sample_dags task
  3. In the Task Instance context menu, click View Log.

  4. In the Log, look for Running: ['bash' to see the output from the bash operator.

    Bash operator log output
    Figure 5. Bash operator log output

Airflow 2

  1. In the DAG's Tree View in the Airflow web interface, click Graph View.

    If you hold the pointer over the print_dag_run_conf task, its status displays.

    Tree view for the composer_sample_dags DAG
    Figure 3. Status of the print_dag_run_conf task
  2. Click the print_dag_run_conf task.

    In the Task Instance context menu, you can get metadata and perform some actions.

    Task Instance context menu for the composer_sample_dags task
    Figure 4. Task Instance context menu for the composer_sample_dags task
  3. In the Task Instance context menu, click Log.

  4. In the log, look for Running command: ['bash' to see the output from the bash operator.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

Delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment:

    1. In the Google Cloud console, go to the Environments page.

      Go to Environments

    2. Select example-environment and click Delete.

    3. Wait until the environment is deleted.

  2. Delete your environment's bucket. Deleting the Cloud Composer environment does not delete its bucket.

    1. In the Google Cloud console, go to the Storage > Browser page.

      Go to Storage > Browser

    2. Select the environment's bucket and click Delete. For example, this bucket can be named us-central1-example-environ-c1616fe8-bucket.

  3. Delete the persistent disk of your environment's Redis queue. Deleting the Cloud Composer environment does not delete its persistent disk.

    1. In the Google Cloud console, go to the Compute Engine > Disks.

      Go to Disks

    2. Select the environment's Redis queue persistent disk and click Delete.

      For example, this disk can be named gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disks for Cloud Composer 1 always have the Standard persistent disk type and the size of 2 GB.

What's next