添加和更新 DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何在 Cloud Composer 环境中管理 DAG。

Cloud Composer 使用 Cloud Storage 存储桶来存储 Cloud Composer 环境的 DAG。您的环境会将此存储桶中的 DAG 同步到 Airflow 组件,例如 Airflow 工作器和调度器。

准备工作

  • 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG
  • 请确保您的帐号具有足够的权限来管理 DAG。
  • 对 DAG 的更改会在 3-5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。

访问环境的存储桶

如需访问与您的环境关联的存储桶,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。此时系统会打开存储分区详情页面。它显示了环境存储桶中 /dags 文件夹的内容。

gcloud

gcloud CLI 提供了单独的命令,供您在环境的存储桶中添加删除 DAG。

如果要与环境的存储桶交互,还可以使用 gsutil 命令行工具。如需获取环境存储桶的地址,请运行以下 gcloud CLI 命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

示例:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

构建 environments.get API 请求。在 Environment 资源的 EnvironmentConfig 资源中,dagGcsPrefix 资源为环境存储桶的地址。

示例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

使用 google-auth 库获取凭据,并使用 requests 库来调用 REST API。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

添加或更新 DAG

如需添加或更新 DAG,请将 DAG 的 Python .py 文件移至环境存储桶中的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。此时系统会打开存储分区详情页面。它显示了环境存储桶中 /dags 文件夹的内容。

  3. 点击上传文件。然后使用浏览器的对话框为该 DAG 选择 Python .py 文件并确认。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 文件。

示例:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

更新具有活跃 DAG 运行作业的 DAG

如果您更新具有活跃 DAG 运行作业的 DAG:

  • 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
  • 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
  • 所有不再存在于更新后的 DAG 文件中的任务都将标记为“已移除”。

更新频繁运行的 DAG

上传 DAG 文件后,Airflow 需要一段时间才能加载此文件并更新 DAG。如果您的 DAG 按频繁的时间表运行,您可能需要确保 DAG 使用更新后的 DAG 文件。为此,请执行以下操作:

  1. 在 Airflow 界面中暂停 DAG。
  2. 上传更新后的 DAG 文件。
  3. 请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。

    即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。

  4. 您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。

  5. (可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:

    1. 在 Google Cloud 控制台中打开环境的日志标签页。

    2. 转到 Composer 日志 > 基础架构 > Cloud Storage 同步项,然后检查环境中每个工作器的日志。查找时间戳在您上传新 DAG 文件之后的最新 Syncing dags directory 日志项。如果您看到后面跟随 Finished syncing 项,则相应 DAG 在此工作器上成功同步。

  6. 取消暂停 DAG。

删除 DAG

本部分介绍如何删除 DAG。

在您的环境中删除 DAG

如需删除某个 DAG,请从环境存储桶中的 /dags 文件夹中移除该 DAG 的 Python .py 文件。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。此时系统会打开存储分区详情页面。它显示了环境存储桶中 /dags 文件夹的内容。

  3. 选择 DAG 文件,点击删除并确认操作。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_FILE 替换为 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

从 Airflow 界面中移除 DAG

在 Airflow 1.10.0 或更高版本中,您可以从 Airflow 界面中移除 DAG。

如需从 Airflow 网页界面中移除 DAG 元数据,请执行以下操作:

Airflow 界面

  1. 转到您环境的 Airflow 界面
  2. 对于相应的 DAG,点击删除 DAG

gcloud

在 1.14.0 之前的 Airflow 1 版本中,在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

在 Airflow 2、Airflow 1.14.0 及更高版本中,请在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_NAME 是要删除的 DAG 的名称。

后续步骤