在 Cloud Composer 3 中运行 Apache Airflow DAG (Google Cloud CLI)
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本快速入门指南介绍了如何在 Cloud Composer 3 中创建 Cloud Composer 环境并运行 Apache Airflow DAG。
如果您刚接触 Airflow,请参阅 Apache Airflow 文档中的 Airflow 概念教程,详细了解 Airflow 的概念、对象及其用法。
如果您想改用 Google Cloud 控制台,请参阅在 Cloud Composer 中运行 Apache Airflow DAG。
如果您想使用 Terraform 创建环境,请参阅创建环境 (Terraform)。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
-
如需查看、创建和管理 Cloud Composer 环境,请执行以下操作:
-
Environment and Storage Object Administrator (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Environment and Storage Object Administrator (
-
如需查看日志,请使用以下角色:
Logs Viewer (
roles/logging.viewer
)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
-
如需查看、创建和管理 Cloud Composer 环境,请执行以下操作:
创建环境的服务账号
创建环境时,您需要指定服务账号。此服务账号称为“环境的服务账号”。您的环境会使用此服务账号执行大多数操作。
您的环境的服务账号不是用户账号。服务账号是由应用或虚拟机 (VM) 实例(而非单个用户)使用的特殊账号。
如需为您的环境创建服务账号,请执行以下操作:
按照 Identity and Access Management 文档中的说明创建新的服务账号。
向该服务授予角色,如 Identity and Access Management 文档中所述。所需的角色为 Composer Worker (
composer.worker
)。
创建环境
在 us-central1
区域中创建一个名为 example-environment
的新环境,并使用最新的 Cloud Composer 3 版本。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.10.2-build.7
创建 DAG 文件
Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。
本指南使用 quickstart.py
文件中定义的 Airflow DAG 示例。此文件中的 Python 代码会执行以下操作:
- 创建一个 DAG
composer_sample_dag
。此 DAG 每天运行一次。 - 执行一项
print_dag_run_conf
任务。该任务使用 bash 运算符输出 DAG 运行的配置。
将 quickstart.py
文件的副本保存在本地机器上:
将 DAG 文件上传到环境的存储分区
每个 Cloud Composer 环境都有一个关联的 Cloud Storage 存储分区。Cloud Composer 中的 Airflow 只会安排位于此存储分区的 /dags
文件夹中的 DAG。
如需安排 DAG,请将 quickstart.py
从本地机器上传到您的环境的 /dags
文件夹:
如需使用 Google Cloud CLI 上传 quickstart.py
,请在 quickstart.py
文件所在的文件夹中运行以下命令:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
查看 DAG
上传 DAG 文件后,Airflow 会执行以下操作:
- 解析您上传的 DAG 文件。DAG 可能需要几分钟才能供 Airflow 使用。
- 将 DAG 添加到可用 DAG 列表中。
- 根据您在 DAG 文件中提供的时间表执行 DAG。
在 DAG 界面中查看 DAG,检查 DAG 是否已处理完毕且是否可在 Airflow 中使用。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是原生 Airflow Web 界面。
请等待大约五分钟,以便 Airflow 处理您之前上传的 DAG 文件,并完成首次 DAG 运行(稍后会加以说明)。
在 Google Cloud CLI 中运行以下命令。此命令会执行
dags list
Airflow CLI 命令,用于列出环境中的 DAG。gcloud composer environments run example-environment \ --location us-central1 \ dags list
检查命令的输出中是否列出了
composer_quickstart
DAG。输出示例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
查看 DAG 运行作业详情
DAG 的单次执行称为 DAG 运行。由于 DAG 文件中的开始日期设置为昨天,因此 Airflow 会立即为示例 DAG 执行 DAG 运行。这样,Airflow 就可以赶上指定 DAG 的时间安排。
示例 DAG 包含一个任务 print_dag_run_conf
,该任务会在控制台中运行 echo
命令。此命令会输出有关 DAG 的元信息(DAG 运行的数字标识符)。
在 Google Cloud CLI 中运行以下命令。以下命令会列出 composer_quickstart
DAG 的 DAG 运行作业:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
输出示例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI 不提供用于查看任务日志的命令。您可以使用其他方法查看 Airflow 任务日志:Cloud Composer DAG 界面、Airflow 界面或 Cloud Logging。本指南介绍了如何在 Cloud Logging 中查询特定 DAG 运行的日志。
在 Google Cloud CLI 中运行以下命令。此命令会从 Cloud Logging 中读取 composer_quickstart
DAG 的特定 DAG 运行的日志。--format
参数会设置输出格式,以便仅显示日志消息的文本。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
您需要进行如下替换:
- 将
RUN_ID
替换为之前运行的tasks states-for-dag-run
命令的输出中的run_id
值。例如2024-02-17T15:38:38.969307+00:00
。
输出示例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除本教程中使用的资源:
删除 Cloud Composer 环境:
在 Google Cloud 控制台中,前往环境页面。
选择
example-environment
,并点击删除。等待环境删除完成。
删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。
在 Google Cloud 控制台中,依次前往存储 > 浏览器页面。
选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为
us-central1-example-environ-c1616fe8-bucket
。
后续步骤