在 Cloud Composer 3 中运行 Apache Airflow DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本快速入门指南介绍了如何在 Cloud Composer 3 中创建 Cloud Composer 环境并运行 Apache Airflow DAG。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Cloud Composer API.

    Enable the API

  7. 如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建环境

  1. 在 Google Cloud 控制台中,前往创建环境页面。

    转到“创建环境”

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。本指南使用的是 us-central1 区域。

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 点击创建,然后等待环境创建完成。

  6. 完成后,相应环境名称旁边会显示一个绿色对勾标记。

创建 DAG 文件

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

本指南使用 quickstart.py 文件中定义的 Airflow DAG 示例。此文件中的 Python 代码将执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

在本地机器上保存 quickstart.py 文件的副本:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

将 DAG 文件上传到环境的存储桶

每个 Cloud Composer 环境都有一个关联的 Cloud Storage 存储桶。Cloud Composer 中的 Airflow 只会安排位于此存储桶的 /dags 文件夹中的 DAG。

如需安排 DAG,请将 quickstart.py 从本地机器上传到您的环境的 /dags 文件夹:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称 example-environment环境详情页面随即打开。

  3. 点击打开 DAGs 文件夹。系统随即会打开存储分区详情页面。

  4. 点击上传文件,然后选择 quickstart.py 的副本。

  5. 如需上传该文件,请点击打开

查看 DAG

上传 DAG 文件后,Airflow 会执行以下操作:

  1. 解析您上传的 DAG 文件。DAG 可能需要几分钟才能供 Airflow 使用。
  2. 将 DAG 添加到可用 DAG 列表。
  3. 根据您在 DAG 文件中提供的时间表执行 DAG。

在 DAG 界面中查看 DAG,检查 DAG 是否已处理完毕且是否可在 Airflow 中使用。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是原生 Airflow Web 界面。

  1. 等待大约 5 分钟,让 Airflow 有时间处理 DAG 文件 完成第一次 DAG 运行 (稍后介绍)。

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表中,点击您的环境名称 example-environment环境详情页面随即打开。

  4. 转到 DAG 标签页。

  5. 检查 composer_quickstart DAG 是否在 DAG 列表中。

    DAG 列表显示了 composer_quickstart DAG,以及状态和时间表等其他信息
    图 1. DAG 列表显示 composer_quickstart DAG(点击可放大)

查看 DAG 运行作业详情

DAG 的单次执行称为 DAG 运行。由于 DAG 文件中的开始日期设置为昨天,因此 Airflow 会立即为示例 DAG 执行 DAG 运行。这样,Airflow 能够与指定的 DAG 同步 时间表。

示例 DAG 包含一个任务 print_dag_run_conf,该任务会在控制台中运行 echo 命令。此命令会输出有关 DAG 的元信息 (DAG 运行的数字标识符)。

  1. DAG 标签页上,点击 composer_quickstartRuns 标签页 DAG 随即打开。

  2. 在 DAG 运行列表中,点击第一个条目。

    DAG 运行列表会显示最近的 DAG 运行作业(其执行日期)
    和状态)
    图 2. 针对 composer_quickstart DAG(点击可放大)
  3. 系统会显示 DAG 运行详情,其中详细介绍了示例 DAG 的各个任务的相关信息。

    包含条目 print_dag_run_conf 的任务列表、其开始时间、结束时间和时长
    图 3. 在 DAG 运行作业(点击可放大)
  4. DAG 运行的日志部分会列出 DAG 运行中的所有任务的日志。您可以在日志中查看 echo 命令的输出。

    任务的日志条目,其中一个是“输出”,另一个列出了标识符
    图 4. print_dag_run_conf 任务的日志(点击可放大)

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。

    1. 在 Google Cloud 控制台中,前往存储 >浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud 控制台中,依次选择 Compute Engine > 磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除

      例如,此磁盘可以命名为 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee。适用于 Cloud Composer 1 的磁盘始终为 Standard persistent disk 类型,大小为 2 GB。

后续步骤