在 Cloud Composer 1 中运行 Apache Airflow DAG (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2

本快速入门指南介绍了如何在 Cloud Composer 1 中创建 Cloud Composer 环境和运行 Apache Airflow DAG。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 安装 Google Cloud CLI。
  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  9. 确保您的 Google Cloud 项目已启用结算功能

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. 如需获取完成本快速入门所需的权限,请让管理员授予您项目的以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建环境

us-central1 区域中创建一个名为 example-environment 的新环境,并使用最新的 Cloud Composer 1 版本

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

创建 DAG 文件

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

本指南使用 quickstart.py 文件中定义的 Airflow DAG 示例。 此文件中的 Python 代码将执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

在本地机器上保存一个 quickstart.py 文件的副本:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

将该 DAG 文件上传到环境的存储桶

每个 Cloud Composer 环境都有一个与之关联的 Cloud Storage 存储桶。Cloud Composer 中的 Airflow 只会调度此存储桶的 /dags 文件夹中的 DAG。

如需调度您的 DAG,请将 quickstart.py 从本地机器上传到环境的 /dags 文件夹:

如需使用 Google Cloud CLI 上传 quickstart.py,请在 quickstart.py 文件所在的文件夹中运行以下命令:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

查看 DAG

上传 DAG 文件后,Airflow 会执行以下操作:

  1. 解析您上传的 DAG 文件。DAG 可能需要几分钟时间才能供 Airflow 使用。
  2. 将该 DAG 添加到可用 DAG 列表中。
  3. 根据您在 DAG 文件中提供的时间表执行 DAG。

检查您的 DAG 是否经过处理且正确无误,以及是否在 DAG 界面中查看并在 Airflow 中可用。DAG 界面是 Cloud Composer 界面,用于在 Google Cloud 控制台中查看 DAG 信息。此外,Cloud Composer 还可以访问 Airflow 界面,该界面是原生 Airflow 网页界面。

  1. 等待大约五分钟,让 Airflow 有时间处理您之前上传的 DAG 文件并完成首次 DAG 运行(稍后会进行说明)。

  2. 在 Google Cloud CLI 中运行以下命令。此命令会执行 dags list Airflow CLI 命令,该命令可列出您环境中的 DAG。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. 检查命令的输出中是否列出了 composer_quickstart DAG。

    输出示例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

查看 DAG 运行详情

DAG 的单次执行称为“DAG 运行”。由于 DAG 文件中的开始日期设置为昨天,因此 Airflow 会立即为示例 DAG 执行 DAG 运行。这样,Airflow 便可以赶上指定 DAG 的时间表。

示例 DAG 包含一个任务 print_dag_run_conf,该任务会在控制台中运行 echo 命令。此命令会输出有关 DAG 的元信息(DAG 运行的数字标识符)。

在 Google Cloud CLI 中运行以下命令。以下命令会列出 composer_quickstart DAG 的 DAG 运行作业:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

输出示例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI 未提供查看任务日志的命令。您可以使用其他方法查看 Airflow 任务日志:Cloud Composer DAG 界面、Airflow 界面或 Cloud Logging。本指南介绍如何在 Cloud Logging 中查询来自特定 DAG 运行的日志。

在 Google Cloud CLI 中运行以下命令。此命令会从 Cloud Logging 读取 composer_quickstart DAG 的特定 DAG 运行的日志。--format 参数会设置输出的格式,以便仅显示日志消息的文本。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

您需要在其中:

  • RUN_ID 替换为您之前运行的 tasks states-for-dag-run 命令的输出中的 run_id 值。例如 2024-02-17T15:38:38.969307+00:00

输出示例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储桶。

    1. 在 Google Cloud 控制台中,转到存储 > 浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud 控制台中,前往 Compute Engine > 磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除

      例如,此磁盘可以命名为 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee。适用于 Cloud Composer 1 的磁盘始终为 Standard persistent disk 类型,大小为 2 GB。

后续步骤