快速入门

本页面介绍如何在 Google Cloud Console 中创建 Cloud Composer 环境并运行简单的 Apache Airflow DAG(也称为工作流)。

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Composer API。

    启用 API

创建环境

控制台

  1. 在 Cloud Console 中,转到“创建环境”页面。

    打开“创建环境”页面

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。如需了解如何选择区域,请参阅可用区域

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 如需创建此环境,请点击创建

  6. 等待环境创建操作完成。完成后,相应环境名称左侧会显示绿色对勾标记。

gcloud

gcloud composer environments create example-environment \
    --location LOCATION

LOCATION 替换为环境所在的 Compute Engine 区域。确保您指定的位置是 Composer 可用的位置。

Terraform

要使用 Terraform 配置此环境,请将以下资源块添加到 Terraform 配置并运行 terraform apply

resource "google_composer_environment" "composer-quickstart" {
    name   = "example-environment"
    region = "LOCATION"
}

LOCATION 替换为环境所在的 Compute Engine 区域。确保您指定的位置是 Composer 可用的位置。

查看环境详情

创建完环境后,您可以查看环境的部署信息,例如 Cloud Composer 版本、Airflow 网页界面网址以及 Cloud Storage 中的 DAG 文件夹。

如需查看部署信息,请执行以下操作:

  1. 在 Cloud Console 中,转到“环境”页面。

    打开“环境”页面

  2. 如需查看“环境详情”页面,请点击 example-environment

创建 DAG

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

quickstart.py 中的 Python 代码执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

如需创建 DAG,请在本地机器上创建 quickstart.py 文件的副本。

import datetime

import airflow
from airflow.operators import bash_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

将 DAG 上传到 Cloud Storage

Cloud Composer 只会安排位于环境的 Cloud Storage 存储分区所含 DAG 文件夹中的 DAG。

如需安排您的 DAG,请将 quickstart.py 从本地机器移到环境的 DAG 文件夹中:

  1. 在 Cloud Console 中,转到“环境”页面。

    打开“环境”页面

  2. 如需打开 /dags 文件夹,请点击 example-environmentDAG 文件夹链接。

  3. 点击“存储分区详情”页面上的上传文件,然后选择 quickstart.py 的本地副本。

  4. 如需上传该文件,请点击打开

    上传您的 DAG 之后,Cloud Composer 会将该 DAG 添加到 Airflow 中并立即对其进行安排。该 DAG 可能要过几分钟才能显示在 Airflow 网页界面中。

在 Airflow 网页界面中查看 DAG

每个 Cloud Composer 环境都有一个运行 Airflow 网页界面的 Web 服务器,该服务器可用于管理 DAG。

如需在 Airflow 网页界面中查看 DAG,请执行以下操作:

  1. 在 Cloud Console 中,转到“环境”页面。

    打开“环境”页面

  2. 如需打开 Airflow 网页界面,请点击 example-environmentAirflow 链接。该界面会在新的浏览器窗口中打开。

  3. 在 Airflow 工具栏中,点击 DAGs

  4. 如需打开 DAG 详情页面,请点击 composer_sample_dag

    DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。

在 Airflow 日志中查看任务实例详情

您安排的 DAG 包含 print_dag_run_conf 任务。该任务会输出 DAG 运行的配置,您可以在任务实例的 Airflow 日志中查看这项配置。

如需查看任务实例详情,请执行以下操作:

  1. 在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View

    如果将鼠标悬停在表示 print_dag_run_conf 任务的图形上,则会显示该任务的状态。 请注意,任务的边框也指示状态(浅绿色边框表示任务正在运行中)。

  2. 点击 print_dag_run_conf 任务。

    此时将显示任务实例上下文菜单。 您可以在此处获取元数据并执行某些操作。

  3. 在任务实例上下文菜单中,点击 View Log

  4. 在日志中,查找 Running: ['bash' 以查看 bash 运算符的输出。

清理

为避免系统因本快速入门中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 如果您打算删除的项目已附加到某个组织,请从页面顶部的组织列表中选择该组织。
  3. 在项目列表中,选择要删除的项目,然后点击删除
  4. 在对话框中输入项目 ID,然后点击关闭以删除项目。

或者,您可以删除本教程中使用的资源

  1. 删除 Cloud Composer 环境
  2. 删除 Cloud Composer 环境的 Cloud Storage 存储分区。删除 Cloud Composer 环境不会删除其存储分区。
  3. 删除 Cloud Composer 环境的 Pub/Sub 主题composer-agentcomposer-backend))。删除 Cloud Composer 环境并不会删除这些主题。

后续步骤