Quickstart

This page shows you how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  1. Creates a Cloud Dataproc cluster
  2. Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage
  3. Deletes the cluster

This page also shows you how to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.

Before you begin

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, registrieren Sie sich hier für ein neues Konto.

  2. Wählen Sie ein GCP-Projekt aus oder erstellen Sie eines.

    Zur Seite "Ressourcen verwalten"

  3. Aktivieren Sie die Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

    Aktivieren Sie die APIs

  4. To run gcloud composer commands in a local terminal window, install the Cloud SDK on your client machine. You can also run the commands in Cloud Shell.
  5. Create a Cloud Composer environment and wait until environment creation completes.

    It can take up to one hour to deploy the Airflow web interface in the Cloud Composer environment. The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.

  6. Create a Cloud Storage bucket in your project and note the bucket path, such as gs://my-bucket, to use later.

Viewing environment information

  1. In the GCP Console, open the Environments page.

    Open the Environments page

  2. Click the name of the environment to see its details.

    The Environment details page provides information, such as the Airflow web interface URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket, and path for the /dags folder.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables: gcp_project, gcs_bucket, and gce_zone.

To set Airflow variables using the gcloud command-line tool, use the gcloud composer environments run command with the variables sub-command. This gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

     gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION variables -- \
     --set KEY VALUE

where:

  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located. The gcloud composer command requires including the --location flag or setting the default location before running the gcloud command.
  • KEY and VALUE specify the variable and its value to set. Include a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the KEY and VALUE arguments.

For example:

    gcloud composer environments run test-environment
    --location us-central1 variables -- --set gcp_project project-id

To set the three variables, run the gcloud composer command once for each row in the following table:

KEY VALUE Details
gcp_project your project-id The Google Cloud Platform project you're using for this quickstart.
gcs_bucket gs://my-bucket The Cloud Storage bucket you created for this quickstart.
gce_zone The Compute Engine zone where your Cloud Dataproc cluster will be created, such as us-central1-a. See Available regions & zones.

To see the value of a variable, run the Airflow CLI sub-command variables with the get argument or use the Airflow UI.

For example:

    gcloud composer environments run test-environment
    --location us-central1 variables -- --get gcs_bucket

Defining the workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in quickstart.py is the workflow code--also referred to as the DAG.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

To orchestrate the three workflow tasks, the DAG imports the following operators:

  1. DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.
  2. DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.
  3. DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.

The tasks run sequentially.

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

The name of the DAG is composer_sample_quickstart, and the DAG runs once each day.

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

Uploading the DAG to Cloud Storage

To upload the DAG:

  1. Copy and save quickstart.py on your local client machine.

  2. Upload your local copy of quickstart.py to the Cloud Storage bucket for your Cloud Composer environment.

Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.

This guide now refers to the workflow as composer_sample_quickstart.

Using the Airflow UI

To access the Airflow web interface using the GCP Console:

  1. Open the Environments page.

    Open the Environments page

  2. In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.

For information about the Airflow UI, see Accessing the web interface.

Viewing Variables

The variables you set earlier are persisted in your environment. You can view the variables by selecting Admin > Variables from the Airflow UI menu bar.

Exploring DAG runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately.

Click composer_sample_quickstart to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

Now, in the toolbar, click Graph View and then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the create_dataproc_cluster graphic.
  2. Click Clear to reset the three tasks and then click OK to confirm.
  3. Click create_dataproc_cluster again in Graph View.
  4. Select Run to re-queue the workflow.

You can also check the status and results of the composer-sample-quickstart workflow by going to the following GCP Console pages:

  • Cloud Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.

  • Cloud Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.

  • Cloud Storage Browser to see the results of the wordcount in the wordcount folder in the Cloud Storage bucket for the environment.

Clean up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this quickstart:

  1. (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
  2. Delete the Cloud Storage bucket you created for this quickstart.
  3. Delete the Cloud Storage bucket for the environment.
  4. Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
  5. (Optional) Delete the project.

    1. Rufen Sie in der GCP Console die Seite "Projekte" auf.

      Zur Seite "Projekte"

    2. Wählen Sie in der Projektliste das Projekt, das Sie löschen wollen und klicken Sie auf Projekt löschen. Nachdem Sie das Kästchen neben dem Projektnamen ausgewählt haben, klicken Sie auf "Projekt löschen"
    3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.

What's next

Hat Ihnen diese Seite weitergeholfen? Teilen Sie uns Ihr Feedback mit:

Feedback geben zu...

Google Cloud Composer