Run an Apache Airflow DAG in Cloud Composer 2 (Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
This quickstart guide shows you how to create a Cloud Composer environment and run an Apache Airflow DAG in Cloud Composer 2.
If you are new to Airflow, see the Airflow concepts tutorial in Apache Airflow documentation for more information about Airflow concepts, objects, and their usage.
If you want to use Google Cloud console instead, see Run an Apache Airflow DAG in Cloud Composer.
If you want to create an environment using Terraform, see Create environments (Terraform).
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
To get the permissions that you need to complete this quickstart, ask your administrator to grant you the following IAM roles on your project:
-
To view create and manage the Cloud Composer environment:
-
Environment and Storage Object Administrator (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Environment and Storage Object Administrator (
-
To view logs:
Logs Viewer (
roles/logging.viewer
)
For more information about granting roles, see Manage access to projects, folders, and organizations.
You might also be able to get the required permissions through custom roles or other predefined roles.
-
To view create and manage the Cloud Composer environment:
Create an environment
If this is the first environment in your project, then
add Cloud Composer Service Agent account as a new principal
on your environment's service account and grant the
roles/composer.ServiceAgentV2Ext
role to it.
By default, your environment uses the default Compute Engine service account, and the following example shows how to add this permission to it.
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
Create a new environment named example-environment
in the us-central1
region, with the latest Cloud Composer 2 version.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.9.11-airflow-2.9.3
Create a DAG file
An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.
This guide uses an example Airflow DAG defined in the quickstart.py
file.
Python code in this file does the following:
- Creates a DAG,
composer_sample_dag
. This DAG runs every day. - Executes one task,
print_dag_run_conf
. The task prints the DAG run's configuration by using the bash operator.
Save a copy of the quickstart.py
file on your local machine:
Upload the DAG file to your environment's bucket
Every Cloud Composer environment has a Cloud Storage
bucket associated with it. Airflow in Cloud Composer schedules only
DAGs that are located in the /dags
folder in this bucket.
To schedule your DAG, upload quickstart.py
from your local machine to your
environment's /dags
folder:
To upload quickstart.py
with Google Cloud CLI, run the following command in
the folder where the quickstart.py
file is located:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
View the DAG
After you upload the DAG file, Airflow does the following:
- Parses the DAG file that you uploaded. It might take a few minutes for the DAG to become available to Airflow.
- Adds the DAG to the list of available DAGs.
- Executes the DAG according to the schedule you provided in the DAG file.
Check that your DAG is processed without errors and is available in Airflow by viewing it in DAG UI. DAG UI is Cloud Composer interface for viewing DAG information in Google Cloud console. Cloud Composer also provides access to Airflow UI, which is a native Airflow web interface.
Wait about five minutes to give Airflow time to process the DAG file that you uploaded previously, and to complete the first DAG run (explained later).
Run the following command in Google Cloud CLI. This command executes the
dags list
Airflow CLI command that lists DAGs in your environment.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Check that the
composer_quickstart
DAG is listed in the command's output.Example output:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
View DAG run details
A single execution of a DAG is called a DAG run. Airflow immediately executes a DAG run for the example DAG because the start date in the DAG file is set to yesterday. In this way, Airflow catches up to the specified DAG's schedule.
The example DAG contains one task, print_dag_run_conf
, which runs the echo
command in the console. This command outputs meta information about the DAG
(DAG run's numeric identifier).
Run the following command in Google Cloud CLI. This command lists DAG runs
for the composer_quickstart
DAG:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Example output:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI does not provide a command to view task logs. You can use other methods to view Airflow task logs: Cloud Composer DAG UI, Airflow UI, or Cloud Logging. This guide shows a way to query Cloud Logging for logs from a specific DAG run.
Run the following command in Google Cloud CLI. This command reads logs from
Cloud Logging for a specific DAG run of the composer_quickstart
DAG. The
--format
argument formats the output so that only the text of the log message
is displayed.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Replace:
RUN_ID
with therun_id
value from the output of thetasks states-for-dag-run
command that you run previously. For example,2024-02-17T15:38:38.969307+00:00
.
Example output:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
Delete the resources used in this tutorial:
Delete the Cloud Composer environment:
In the Google Cloud console, go to the Environments page.
Select
example-environment
and click Delete.Wait until the environment is deleted.
Delete your environment's bucket. Deleting the Cloud Composer environment does not delete its bucket.
In the Google Cloud console, go to the Storage > Browser page.
Select the environment's bucket and click Delete. For example, this bucket can be named
us-central1-example-environ-c1616fe8-bucket
.
Delete the persistent disk of your environment's Redis queue. Deleting the Cloud Composer environment does not delete its persistent disk.
In the Google Cloud console, go to the Compute Engine > Disks.
Select the environment's Redis queue persistent disk and click Delete.
For example, this disk can be named
pvc-02bc4842-2312-4347-8519-d87bdcd31115
. Disks for Cloud Composer 2 always have theBalanced persistent disk
type and the size of 2 GB.
What's next