添加和更新 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了如何在 Cloud Composer 中管理 DAG 环境

Cloud Composer 使用 Cloud Storage 存储分区来存储 Cloud Composer 环境的 DAG。您的环境会将 DAG 从此存储桶同步到 Airflow 组件,例如 Airflow 工作器和调度器。

准备工作

  • 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG
  • 确保您的账号拥有足够的权限, 管理 DAG。
  • 对 DAG 的更改会在 3-5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。

访问您的环境的存储桶

如需访问与您的环境关联的存储桶,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。通过 存储分区详情页面即会打开。它会显示环境存储桶中 /dags 文件夹的内容。

gcloud

gcloud CLI 通过单独的命令 在环境的存储桶中添加删除 DAG。

如果您想与环境的存储桶进行交互,还可以使用 使用 Google Cloud CLI。如需获取环境存储桶的地址,请运行以下 gcloud CLI 命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

例如:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

构建 environments.get API 请求。在 Environment 资源,在 EnvironmentConfig 资源,位于 dagGcsPrefix 资源是您环境的存储桶的地址。

示例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

使用 google-auth 库 获取凭据并使用 requests 库 调用 REST API。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

添加或更新 DAG

如需添加或更新 DAG,请将 DAG 的 Python .py 文件移至 环境存储桶中的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。通过 存储分区详情页面即会打开。它会显示环境存储桶中 /dags 文件夹的内容。

  3. 点击上传文件。然后,为 DAG 选择 Python .py 文件 使用浏览器的对话框确认即可。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

更新具有活跃 DAG 运行作业的 DAG

如果您更新具有活跃 DAG 运行作业的 DAG:

  • 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
  • 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
  • 更新后的 DAG 文件中不再存在的所有任务均标记为 已移除。

更新频繁运行的 DAG

上传 DAG 文件后,Airflow 需要一些时间来加载此文件并更新此 DAG。如果您的 DAG 频繁运行,建议您进行如下设置: 确保 DAG 使用更新后的 DAG 文件版本。为此,请执行以下操作:

  1. 在 Airflow 界面中暂停 DAG。

  2. 上传更新后的 DAG 文件。

  3. 请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。

    即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。

  4. 您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。

  5. (可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:

    1. 在 Google Cloud 控制台中打开环境的日志标签页。

    2. 转到 Composer 日志 > 基础架构 > Cloud Storage 同步项并检查以下集群中每个工作器的日志 您的环境查找最新的Syncing dags directory日志 时间戳在您上传新 DAG 文件之后的条目。如果您 其后面的 Finished syncing 项,则 DAG 已成功在此工作器上同步。

  6. 取消暂停 DAG。

在您的环境中删除 DAG

如需删除 DAG,请从以下位置移除 DAG 的 Python .py 文件: 环境的存储桶中,创建环境的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行 在 DAGs 文件夹列中,点击 DAGs 链接。存储桶详情页面随即打开。它会显示环境存储桶中 /dags 文件夹的内容。

  3. 选择 DAG 文件,点击删除,然后确认操作。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_FILE 替换为 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

从 Airflow 界面中移除 DAG

如需从 Airflow 网页界面中移除 DAG 元数据,请执行以下操作:

Airflow 界面

  1. 前往您的环境的 Airflow 界面
  2. 对于 DAG,点击删除 DAG

gcloud

在早于 1.14.0 的 Airflow 1 版本中,运行以下命令: gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

在 Airflow 2、Airflow 1.14.0 及更高版本中,运行以下命令: gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_NAME 是要删除的 DAG 的名称。

后续步骤