在 Cloud Composer 1 中运行 Apache Airflow DAG (Google Cloud CLI)

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本快速入门指南介绍了如何创建 Cloud Composer 环境 并在 Cloud Composer 1 中运行 Apache Airflow DAG。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 安装 Google Cloud CLI。
  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. 确保您的 Google Cloud 项目已启用结算功能

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. 如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建环境

us-central1 中创建名为 example-environment 的新环境。 使用最新的 Cloud Composer 1 区域

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

创建 DAG 文件

Airflow DAG 是有序任务的集合, 您想定期生成报表DAG 在标准 Python 文件中定义。

本指南使用的是 quickstart.py 文件中定义的示例 Airflow DAG。 此文件中的 Python 代码将执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

在本地机器上保存 quickstart.py 文件的副本:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

将 DAG 文件上传到环境的存储桶

每个 Cloud Composer 环境都有一个 Cloud Storage 存储桶仅限 Cloud Composer 时间表中的 Airflow 位于此存储桶的 /dags 文件夹中的 DAG。

如需安排您的 DAG,请将 quickstart.py 从本地机器上传到您的 环境的 /dags 文件夹:

如需使用 Google Cloud CLI 上传 quickstart.py,请在以下位置运行以下命令: quickstart.py 文件所在的文件夹:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

查看 DAG

上传 DAG 文件后,Airflow 会执行以下操作:

  1. 解析您上传的 DAG 文件。系统可能需要几分钟时间 以供 Airflow 使用。
  2. 将 DAG 添加到可用 DAG 列表中。
  3. 根据您在 DAG 文件中提供的时间表执行 DAG。

检查您的 DAG 是否得到处理且没有任何错误,以及是否通过 在 DAG 界面中查看它。DAG 界面是 Cloud Composer 界面,可用于查看 Google Cloud 控制台中的 DAG 信息。Cloud Composer 还提供 访问 Airflow 界面,它是原生 Airflow Web 应用 界面。

  1. 等待大约 5 分钟,让 Airflow 有时间处理 DAG 文件 完成第一次 DAG 运行 (稍后介绍)。

  2. 在 Google Cloud CLI 中运行以下命令。此命令将执行 dags list Airflow CLI 命令,用于列出 环境

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. 检查 composer_quickstart DAG 是否列在命令的输出中。

    输出示例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

查看 DAG 运行详情

DAG 的单次执行称为“DAG 运行”。Airflow 立即 针对示例 DAG 执行 DAG 运行,因为 DAG 文件中的开始日期是 设为昨天。这样,Airflow 能够与指定的 DAG 同步 时间表。

示例 DAG 包含一项 print_dag_run_conf 任务,该任务运行 echo 命令。此命令会输出有关 DAG 的元信息 (DAG 运行的数字标识符)。

在 Google Cloud CLI 中运行以下命令。此命令会列出 DAG 运行 对于 composer_quickstart DAG:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

输出示例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI 未提供查看任务日志的命令。您可以使用 用于查看 Airflow 任务日志的其他方法: Cloud Composer DAG 界面、Airflow 界面或 Cloud Logging。本指南 展示了如何在 Cloud Logging 中查询来自特定 DAG 运行的日志。

在 Google Cloud CLI 中运行以下命令。此命令可从 composer_quickstart DAG 的特定 DAG 运行作业的 Cloud Logging。通过 --format 参数设置输出的格式,使其仅包含日志消息的文本。 。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

您需要将其中的:

  • RUN_ID 替换为run_id tasks states-for-dag-run 命令。例如 2024-02-17T15:38:38.969307+00:00

输出示例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。

    1. 在 Google Cloud 控制台中,前往存储 >浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud 控制台中,前往 Compute Engine >磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除

      例如,此磁盘可以命名为 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee。适用于 Cloud Composer 1 的磁盘始终为 Standard persistent disk 类型,大小为 2 GB。

后续步骤