在 Cloud Composer 2 中运行 Apache Airflow DAG
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本快速入门指南介绍了如何创建 Cloud Composer 环境 并在 Cloud Composer 2 中运行 Apache Airflow DAG。
如果您刚接触 Airflow,请参阅 Apache Airflow 中的 Airflow 概念教程 关于 Airflow 概念、对象和 使用情况。
如果您想改用 Google Cloud CLI,请参阅 在 Cloud Composer (Google Cloud CLI) 中运行 Apache Airflow DAG。
如果要使用 Terraform 创建环境,请参阅 创建环境 (Terraform)。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API.
-
如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
-
如需查看、创建和管理 Cloud Composer 环境,请执行以下操作:
-
环境和存储对象管理员 (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
环境和存储对象管理员 (
-
如需查看日志,请使用以下角色:
Logs Viewer (
roles/logging.viewer
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
-
如需查看、创建和管理 Cloud Composer 环境,请执行以下操作:
创建环境
在 Google Cloud 控制台中,前往创建环境页面。
如果这是您项目中的第一个环境 向 Cloud Composer 服务账号授予所需的权限 部分。
您将 Cloud Composer Service Agent 账号添加为环境服务账号的新主账号,并向其授予 Cloud Composer v2 API Service Agent Extension 角色。
确认您为环境使用预期的服务账号。 然后点击授权。
在名称字段中,输入
example-environment
。在位置下拉列表中,为 Cloud Composer 环境选择一个区域。本指南使用的是
us-central1
区域。对于其他环境配置选项,请使用提供的默认值。
点击创建,然后等待环境创建完成。
完成后,相应环境名称旁边会显示一个绿色对勾标记。
创建 DAG 文件
Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。
本指南使用 quickstart.py
文件中定义的 Airflow DAG 示例。此文件中的 Python 代码会执行以下操作:
- 创建一个 DAG
composer_sample_dag
。此 DAG 每天运行一次。 - 执行一项
print_dag_run_conf
任务。该任务使用 bash 运算符输出 DAG 运行的配置。
将 quickstart.py
文件的副本保存在本地机器上:
将 DAG 文件上传到环境的存储桶
每个 Cloud Composer 环境都有一个 Cloud Storage
存储桶仅限 Cloud Composer 时间表中的 Airflow
位于此存储桶的 /dags
文件夹中的 DAG。
如需安排您的 DAG,请将 quickstart.py
从本地机器上传到您的
环境的 /dags
文件夹:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称
example-environment
。系统随即会打开环境详情页面。点击打开 DAGs 文件夹。系统随即会打开存储分区详情页面。
点击上传文件,然后选择
quickstart.py
副本。如需上传该文件,请点击打开。
查看 DAG
上传 DAG 文件后,Airflow 会执行以下操作:
- 解析您上传的 DAG 文件。系统可能需要几分钟时间 以供 Airflow 使用。
- 将 DAG 添加到可用 DAG 列表。
- 根据您在 DAG 文件中提供的时间表执行 DAG。
在 DAG 界面中查看 DAG,检查 DAG 是否已处理完毕且是否可在 Airflow 中使用。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是原生 Airflow Web 界面。
等待大约 5 分钟,让 Airflow 有时间处理 DAG 文件 完成第一次 DAG 运行 (稍后介绍)。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称
example-environment
。系统随即会打开环境详情页面。前往 DAG 标签页。
检查
composer_quickstart
DAG 是否在 DAG 列表中。
查看 DAG 运行作业详情
DAG 的单次执行称为 DAG 运行。由于 DAG 文件中的开始日期设置为昨天,因此 Airflow 会立即为示例 DAG 执行 DAG 运行。这样,Airflow 能够与指定的 DAG 同步 时间表。
示例 DAG 包含一项 print_dag_run_conf
任务,该任务运行 echo
命令。此命令会输出有关 DAG 的元信息
(DAG 运行的数字标识符)。
在 DAG 标签页上,点击
composer_quickstart
。系统会打开 DAG 的运行标签页。在 DAG 运行列表中,点击第一个条目。
系统会显示 DAG 运行详情,其中详细介绍了示例 DAG 的各个任务的相关信息。
DAG 运行的日志部分会列出 DAG 运行中的所有任务的日志。您可以在日志中看到
echo
命令的输出。
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
删除本教程中使用的资源:
删除 Cloud Composer 环境:
在 Google Cloud 控制台中,前往环境页面。
选择
example-environment
,并点击删除。等待环境删除完成。
删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。
在 Google Cloud 控制台中,依次前往存储 > 浏览器页面。
选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为
us-central1-example-environ-c1616fe8-bucket
。
删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。
在 Google Cloud 控制台中,依次选择 Compute Engine > 磁盘。
选择环境的 Redis 队列永久性磁盘,然后点击删除。
例如,此磁盘可以命名为
pvc-02bc4842-2312-4347-8519-d87bdcd31115
。适用于 Cloud Composer 2 的磁盘始终为Balanced persistent disk
类型,大小为 2 GB。
后续步骤