Quickstart

Cloud Composer 1 | Cloud Composer 2

This page shows you how to create a Cloud Composer environment and run an Apache Airflow DAG in Cloud Composer.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Cloud Composer API.

    Enable the API

Create an environment

Console

  1. In the Google Cloud Console, go to the Create environment page.

    Go to Create environment

  2. In the Name field, enter example-environment.

  3. In the Location drop-down list, select a region for the Cloud Composer environment. See Available regions for information on selecting a region.

  4. For other environment configuration options, use the provided defaults.

  5. To create the environment, click Create.

  6. Wait until environment creation is completed. When done, a green check mark shows next to the environment name.

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION

Replace:

  • ENVIRONMENT_NAME with the name of the environment.

  • LOCATION with the region for the environment.

A location is the Compute Engine region where the environment's GKE cluster is located.

Example:

gcloud composer environments create example-environment \
    --location us-central1

Terraform

To configure this environment using Terraform, add the following resource block to your Terraform configuration and run terraform apply.

For more information about using Terraform to create a Cloud Composer environment, refer to the Terraform documentation.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"
}

Replace:

  • ENVIRONMENT_NAME with the name of the environment.

  • LOCATION with the region for the environment.

    A location is the Compute Engine region where the environment's GKE cluster is located.

Example:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"
}

View environment details

After environment creation is completed, you can view your environment's information, such as the Cloud Composer version, the URL for the Airflow web interface, and the DAGs folder in Cloud Storage.

To view the environment information:

  1. In the Cloud Console, go to the Environments page.

    Open the Environments page

  2. To view the Environment details page, click example-environment.

Create a DAG

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.

The Python code in quickstart.py:

  1. Creates a DAG, composer_sample_dag. The DAG runs once per day.
  2. Executes one task, print_dag_run_conf. The task prints the DAG run's configuration by using the bash operator.

To create a DAG, create a copy of the quickstart.py file on your local machine.

Airflow 2

import datetime

import airflow
from airflow.operators import bash

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Upload the DAG to Cloud Storage

Cloud Composer schedules only the DAGs that are in the DAGs folder in the environment's Cloud Storage bucket.

To schedule your DAG, move quickstart.py from your local machine to your environment's DAGs folder:

  1. In the Cloud Console, go to the Environments page.

    Open the Environments page

  2. To open the /dags folder, click the DAGs folder link for example-environment.

  3. On the Bucket details page, click Upload files and then select your local copy of quickstart.py.

  4. To upload the file, click Open.

    After you upload your DAG, Cloud Composer adds the DAG to Airflow and schedules the DAG immediately. It might take a few minutes for the DAG to show up in the Airflow web interface.

View the DAG in the Airflow web interface

Each Cloud Composer environment has a web server that runs the Airflow web interface that you can use to manage DAGs.

To view the DAG in the Airflow web interface:

  1. In the Cloud Console, go to the Environments page.

    Open the Environments page

  2. To open the Airflow web interface, click the Airflow link for example-environment. The interface opens in a new browser window.

  3. In the Airflow toolbar, click DAGs.

  4. To open the DAG details page, click composer_sample_dag.

    The page for the DAG shows the Tree View, a graphical representation of the workflow's tasks and dependencies.

View task instance details in the Airflow logs

The DAG that you scheduled includes the print_dag_run_conf task. The task prints the DAG run's configuration, which you can see in the Airflow logs for the task instance.

To view the task instance details:

  1. In the DAG's Tree View in the Airflow web interface, click Graph View.

    If you mouseover the graphic for the print_dag_run_conf task, its status displays. Note that the border around the task also indicates the status (light green border = running).

  2. Click print_dag_run_conf task.

    The Task Instance Context Menu displays. Here you can get metadata and perform some actions.

  3. In the Task Instance Context Menu, click View Log.

  4. In the Log, look for Running: ['bash' to see the output from the bash operator.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this page, follow these steps.

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. If the project that you plan to delete is attached to an organization, expand the Organization list in the Name column.
  3. In the project list, select the project that you want to delete, and then click Delete.
  4. In the dialog, type the project ID, and then click Shut down to delete the project.

Alternatively, you can delete the resources used in this tutorial:

  1. Delete the Cloud Composer environment.
  2. Delete the Cloud Storage bucket for the Cloud Composer environment. Deleting the Cloud Composer environment does not delete its bucket.

What's next