在 Cloud Composer 3 中运行 Apache Airflow DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本快速入门指南介绍了如何在 Cloud Composer 3 中创建 Cloud Composer 环境并运行 Apache Airflow DAG。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 启用 Cloud Composer API。

    启用 API

  7. 如需获取完成本快速入门所需的权限,请让管理员向您授予项目的以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建环境

  1. 在 Google Cloud 控制台中,前往创建环境页面。

    转到“创建环境”

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。本指南使用的是 us-central1 区域。

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 点击创建,然后等待环境创建完成。

  6. 完成后,环境名称旁边会显示一个绿色对勾标记。

创建 DAG 文件

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

本指南使用的是 quickstart.py 文件中定义的示例 Airflow DAG。此文件中的 Python 代码将执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

在本地机器上保存 quickstart.py 文件的副本:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

将 DAG 文件上传到环境的存储桶

每个 Cloud Composer 环境都有一个关联的 Cloud Storage 存储桶。Cloud Composer 中的 Airflow 仅安排位于此存储桶的 /dags 文件夹中的 DAG。

如需安排您的 DAG,请将 quickstart.py 从本地机器上传到环境的 /dags 文件夹:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称 example-environment。系统随即会打开环境详情页面。

  3. 点击打开 DAGs 文件夹。系统随即会打开存储分区详情页面。

  4. 点击上传文件,然后选择 quickstart.py 副本。

  5. 如需上传该文件,请点击打开

查看 DAG

上传 DAG 文件后,Airflow 会执行以下操作:

  1. 解析您上传的 DAG 文件。DAG 可能需要几分钟才能供 Airflow 使用。
  2. 将 DAG 添加到可用 DAG 列表中。
  3. 根据您在 DAG 文件中提供的时间表执行 DAG。

通过在 DAG 界面中查看 DAG,检查您的 DAG 是否处理成功,以及是否可在 Airflow 中使用。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。Cloud Composer 还提供 Airflow 界面访问权限,该界面是原生 Airflow 网页界面。

  1. 等待大约 5 分钟,让 Airflow 有时间处理您之前上传的 DAG 文件,并完成第一次 DAG 运行(稍后介绍)。

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表中,点击您的环境名称 example-environment。系统随即会打开环境详情页面。

  4. 转到 DAG 标签页。

  5. 检查 DAG 列表中是否存在 composer_quickstart DAG。

    DAG 列表会显示 composer_quickstart DAG 以及状态和时间表等其他信息
    图 1.DAG 列表会显示 composer_quickstart DAG(点击可放大)

查看 DAG 运行详情

DAG 的单次执行称为“DAG 运行”。Airflow 会立即执行示例 DAG 的 DAG 运行,因为 DAG 文件中的开始日期设置为昨天。通过这种方式,Airflow 赶上指定 DAG 的时间表。

示例 DAG 包含一项 print_dag_run_conf 任务,该任务在控制台中运行 echo 命令。此命令会输出有关 DAG 的元信息(DAG 运行的数字标识符)。

  1. DAG 标签页上,点击 composer_quickstart。此时将打开 DAG 的 Runs 标签页。

  2. 在 DAG 运行列表中,点击第一个条目。

    DAG 运行列表会显示最近的 DAG 运行作业(其执行日期和状态)
    图 2.针对 composer_quickstart DAG 的 DAG 运行列表(点击可放大)
  3. 系统随即会显示 DAG 运行详细信息,其中详细说明了示例 DAG 的各个任务的相关信息。

    带有一个 print_dag_run_conf 条目及其开始时间、结束时间和时长的任务列表
    图 3.在 DAG 运行中执行的任务列表(点击可放大)
  4. DAG 运行日志部分列出了 DAG 运行中所有任务的日志。您可以在日志中查看 echo 命令的输出。

    任务的日志条目,一个是输出条目,另一个列出标识符
    图 4.print_dag_run_conf 任务的日志(点击可放大)

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储桶。

    1. 在 Google Cloud 控制台中,前往存储 > 浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud 控制台中,转到 Compute Engine > 磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除

      例如,此磁盘可以命名为 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee。适用于 Cloud Composer 1 的磁盘始终为 Standard persistent disk 类型,大小为 2 GB。

后续步骤