在 Cloud Composer 1 中运行 Apache Airflow DAG
Cloud Composer 1 | Cloud Composer 2
本页面介绍如何在 Cloud Composer 中创建 Cloud Composer 环境并运行 Apache Airflow DAG。
如果您刚接触 Airflow,请参阅此教程详细了解 Airflow 概念、对象及其用法。
准备工作
- 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
启用 Cloud Composer API。
创建环境
控制台
gcloud
将 Cloud Composer Service Agent 帐号添加为环境服务帐号的新主帐号,然后向其授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) 角色。
默认情况下,您的环境使用默认 Compute Engine 服务帐号。
# Get current project's project number
PROJECT_NUMBER=gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
创建新环境:
gcloud composer environments create ENVIRONMENT_NAME \
--location LOCATION \
--image-version IMAGE_VERSION
您需要将其中的:
ENVIRONMENT_NAME
替换为环境的名称。 本快速入门使用的是example-environment
。- 将
LOCATION
替换为用于 Cloud Composer 环境的某个区域。如需了解如何选择区域,请参阅可用区域。 - 将
IMAGE_VERSION
替换为 Cloud Composer 映像的名称。本指南使用composer-1.20.12-airflow-1.10.15
创建一个包含最新 Cloud Composer 映像的环境。
示例:
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-1.20.12-airflow-1.10.15
Terraform
要使用 Terraform 配置此环境,请将以下资源块添加到 Terraform 配置并运行 terraform apply
。
如需使用此资源块,Terraform 使用的服务帐号必须具有一个角色,并且启用了 composer.environments.create
权限。如需详细了解 Terraform 的服务帐号,请参阅 Google 提供程序配置参考。
如需详细了解如何使用 Terraform 创建 Cloud Composer 环境,请参阅 Terraform 文档。
resource "google_composer_environment" "example" {
name = "ENVIRONMENT_NAME"
region = "LOCATION"
config {
software_config {
image_version = "IMAGE_VERSION"
}
}
}
ENVIRONMENT_NAME
替换为环境的名称。 本快速入门使用的是example-environment
。将
LOCATION
替换为用于 Cloud Composer 环境的某个区域。如需了解如何选择区域,请参阅可用区域。将
IMAGE_VERSION
替换为 Cloud Composer 映像的名称。本指南使用composer-1.20.12-airflow-1.10.15
创建一个包含最新 Cloud Composer 映像的环境。
示例:
resource "google_composer_environment" "example" {
name = "example-environment"
region = "us-central1"
config {
software_config {
image_version = "composer-1.20.12-airflow-1.10.15"
}
}
}
查看环境详情
创建完环境后,您可以查看环境的信息,例如 Cloud Composer 版本、Airflow 网页界面的网址,以及 Cloud Storage 中的 DAGs 文件夹。
如需查看环境信息,请执行以下操作:
在 Google Cloud 控制台中,转到环境页面。
如需查看环境详情页面,请点击您的环境名称
example-environment
。
创建 DAG
Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。
quickstart.py
中的 Python 代码执行以下操作:
- 创建一个 DAG
composer_sample_dag
。此 DAG 每天运行一次。 - 执行一项
print_dag_run_conf
任务。该任务使用 bash 运算符输出 DAG 运行的配置。
如需创建 DAG,请在本地机器上创建 quickstart.py
文件的副本。
Airflow 1
Airflow 2
将 DAG 上传到 Cloud Storage
Cloud Composer 只会安排位于环境的 Cloud Storage 存储桶所含 /dags
文件夹中的 DAG。
如需安排 DAG,请将 quickstart.py
从本地机器上传到您的环境的 /dags
文件夹。
控制台
在 Google Cloud 控制台中,转到环境页面。
如需打开
/dags
文件夹,请访问example-environment
的 DAGs 文件夹链接。点击“存储分区详情”页面上的上传文件,然后选择
quickstart.py
的本地副本。如需上传该文件,请点击打开。
上传您的 DAG 之后,Cloud Composer 会将该 DAG 添加到 Airflow 中并立即安排一个 DAG 运行。该 DAG 可能要过几分钟才能显示在 Airflow 网页界面中。
gcloud
要使用 gcloud
上传 quickstart.py
,请运行以下命令:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
在 Airflow 界面中查看 DAG。
每个 Cloud Composer 环境都有一个 Web 服务器,用于运行 Airflow 网页界面。您可以通过 Airflow 网页界面管理 DAG。
如需在 Airflow 网页界面中查看 DAG,请执行以下操作:
Airflow 1
在 Google Cloud 控制台中,转到环境页面。
如需打开 Airflow 网页界面,请点击
example-environment
的 Airflow 链接。Airflow 界面会在新的浏览器窗口中打开。在 Airflow 工具栏中,转到 DAG 页面。
如需打开 DAG 详情页面,请点击
composer_sample_dag
。图 1:Airflow 界面中的 DAG 页面(点击可放大) DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。
图 2:composer_sample_dags DAG 的树状视图
Airflow 2
在 Google Cloud 控制台中,转到环境页面。
如需打开 Airflow 网页界面,请点击
example-environment
的 Airflow 链接。Airflow 界面会在新的浏览器窗口中打开。在 Airflow 工具栏中,转到 DAG 页面。
如需打开 DAG 详情页面,请点击
composer_sample_dag
。图 1:Airflow 界面中的 DAG 页面(点击可放大) DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。
图 2:composer_sample_dags DAG 的树状视图
在 Airflow 日志中查看任务实例详情
您安排的 DAG 包含 print_dag_run_conf
任务。该任务会输出 DAG 运行的配置,您可以在任务实例的 Airflow 日志中查看这项配置。
如需查看任务实例详情,请执行以下操作:
Airflow 1
在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View。
如果将鼠标指针悬停在
print_dag_run_conf
任务上,系统会显示该任务的状态。图 3. print_dag_run_conf 任务的状态 点击
print_dag_run_conf
任务。在任务实例上下文菜单中,您可以获取元数据,并执行某些操作。
图 4.composer_sample_dags 任务的任务实例上下文菜单 在任务实例上下文菜单中,点击 View Log。
在日志中,查找
Running: ['bash'
以查看 bash 运算符的输出。图 5.Bash 运算符日志输出
Airflow 2
在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View。
如果将鼠标指针悬停在
print_dag_run_conf
任务上,系统会显示该任务的状态。图 3. print_dag_run_conf 任务的状态 点击
print_dag_run_conf
任务。在任务实例上下文菜单中,您可以获取元数据,并执行某些操作。
图 4.composer_sample_dags 任务的任务实例上下文菜单 在任务实例上下文菜单中,点击 View Log。
在日志中,查找
Running command: ['bash'
以查看 bash 运算符的输出。[2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo 735'] [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output: [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735 [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with return code 0
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
删除本教程中使用的资源:
删除 Cloud Composer 环境:
在 Google Cloud 控制台中,转到环境页面。
选择
example-environment
,并点击删除。等待环境删除完成。
删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储桶。
在 Google Cloud 控制台中,转到存储 > 浏览器页面。
选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为
us-central1-example-environ-c1616fe8-bucket
。
删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。
在 Google Cloud 控制台中,转到 Compute Engine > 磁盘。
选择环境的 Redis 队列永久性磁盘,然后点击删除。
例如,此磁盘可以命名为
gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee
。适用于 Cloud Composer 1 的磁盘始终为Standard persistent disk
类型,大小为 2 GB。