在 Cloud Composer 2 中运行 Apache Airflow DAG (Google Cloud CLI)
Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本快速入门指南介绍了如何创建 Cloud Composer 环境 并在 Cloud Composer 2 中运行 Apache Airflow DAG。
如果您刚接触 Airflow,请参阅 Apache Airflow 文档中的 Airflow 概念教程,详细了解 Airflow 的概念、对象及其用法。
如果您想改用 Google Cloud 控制台,请参阅 在 Cloud Composer 中运行 Apache Airflow DAG。
如果要使用 Terraform 创建环境,请参阅 创建环境 (Terraform)。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
-
如需查看创建和管理 Cloud Composer 环境,请执行以下操作:
-
Environment and Storage Object Administrator (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Environment and Storage Object Administrator (
-
如需查看日志,请执行以下操作:
日志查看器 (
roles/logging.viewer
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
-
如需查看创建和管理 Cloud Composer 环境,请执行以下操作:
创建环境
如果这是项目中的第一个环境,请将 Cloud Composer Service Agent 账号添加为环境服务账号的新主体,并向其授予 roles/composer.ServiceAgentV2Ext
角色。
默认情况下,您的环境使用默认的 Compute Engine 服务账号,以下示例展示了如何向该账号添加此权限。
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
在 us-central1
区域中创建一个名为 example-environment
的新环境,并使用最新的 Cloud Composer 2 版本。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.9.5-airflow-2.9.1
创建 DAG 文件
Airflow DAG 是有序任务的集合, 您想定期生成报表DAG 在标准 Python 文件中定义。
本指南使用的是 quickstart.py
文件中定义的示例 Airflow DAG。
此文件中的 Python 代码会执行以下操作:
- 创建一个 DAG
composer_sample_dag
。此 DAG 每天运行一次。 - 执行一项
print_dag_run_conf
任务。该任务使用 bash 运算符输出 DAG 运行的配置。
将 quickstart.py
文件的副本保存在本地机器上:
将 DAG 文件上传到环境的存储桶
每个 Cloud Composer 环境都有一个关联的 Cloud Storage 存储桶。Cloud Composer 中的 Airflow 只会安排位于此存储桶的 /dags
文件夹中的 DAG。
如需安排您的 DAG,请将 quickstart.py
从本地机器上传到您的
环境的 /dags
文件夹:
如需使用 Google Cloud CLI 上传 quickstart.py
,请在以下位置运行以下命令:
quickstart.py
文件所在的文件夹:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
查看 DAG
上传 DAG 文件后,Airflow 会执行以下操作:
- 解析您上传的 DAG 文件。DAG 可能需要几分钟才能供 Airflow 使用。
- 将 DAG 添加到可用 DAG 列表。
- 根据您在 DAG 文件中提供的时间表执行 DAG。
检查您的 DAG 是否得到处理且没有任何错误,以及是否通过 在 DAG 界面中查看它。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。Cloud Composer 还提供 访问 Airflow 界面,它是原生 Airflow Web 应用 界面。
等待大约五分钟,以便 Airflow 处理您之前上传的 DAG 文件,并完成首次 DAG 运行(稍后会加以说明)。
在 Google Cloud CLI 中运行以下命令。此命令将执行
dags list
Airflow CLI 命令,用于列出 环境gcloud composer environments run example-environment \ --location us-central1 \ dags list
检查命令的输出中是否列出了
composer_quickstart
DAG。输出示例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
查看 DAG 运行作业详情
DAG 的单次执行称为 DAG 运行。Airflow 立即 针对示例 DAG 执行 DAG 运行,因为 DAG 文件中的开始日期是 设为昨天。这样,Airflow 就可以赶上指定 DAG 的时间安排。
示例 DAG 包含一个任务 print_dag_run_conf
,该任务会在控制台中运行 echo
命令。此命令会输出有关 DAG 的元信息(DAG 运行的数字标识符)。
在 Google Cloud CLI 中运行以下命令。以下命令会列出 composer_quickstart
DAG 的 DAG 运行作业:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
输出示例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI 不提供用于查看任务日志的命令。您可以使用 用于查看 Airflow 任务日志的其他方法: Cloud Composer DAG 界面、Airflow 界面或 Cloud Logging。本指南 展示了如何在 Cloud Logging 中查询来自特定 DAG 运行的日志。
在 Google Cloud CLI 中运行以下命令。此命令可从
composer_quickstart
DAG 的特定 DAG 运行作业的 Cloud Logging。--format
参数用于设置输出格式,以便仅显示日志消息的文本。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
您需要将其中的:
- 将
RUN_ID
替换为run_id
tasks states-for-dag-run
命令。例如2024-02-17T15:38:38.969307+00:00
。
输出示例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除本教程中使用的资源:
删除 Cloud Composer 环境:
在 Google Cloud 控制台中,前往环境页面。
选择
example-environment
,并点击删除。等待环境删除完成。
删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。
在 Google Cloud 控制台中,依次前往存储 > 浏览器页面。
选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为
us-central1-example-environ-c1616fe8-bucket
。
删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。
在 Google Cloud 控制台中,依次选择 Compute Engine > 磁盘。
选择环境的 Redis 队列永久性磁盘,然后点击删除。
例如,此磁盘可以命名为
pvc-02bc4842-2312-4347-8519-d87bdcd31115
。适用于 Cloud Composer 2 的磁盘始终为Balanced persistent disk
类型,大小为 2 GB。
后续步骤