添加和更新 DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何在 Cloud Composer 环境中管理 DAG。

Cloud Composer 使用 Cloud Storage 存储分区来存储 Cloud Composer 环境的 DAG。您的环境会将此存储分区中的 DAG 同步到 Airflow 工作器和调度器等 Airflow 组件

准备工作

  • 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG
  • 确保您的帐号拥有足够的权限来管理 DAG。
  • DAG 的更改会在 3-5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。

访问环境的存储分区

如需访问与您的环境关联的存储分区,请执行以下操作:

控制台

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中点击 DAG 链接。系统会打开存储分区详情页面。它会显示环境存储分区中 /dags 文件夹的内容。

gcloud

gcloud CLI 包含单独的命令,可用于在环境的存储分区中添加删除 DAG。

如果您希望与环境的存储分区进行互动,也可以使用 gsutil 命令行工具。如需获取环境存储分区的地址,请运行以下 gcloud CLI 命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

示例:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

构建 environments.get API 请求。在 Environment 资源的 EnvironmentConfig 资源中,dagGcsPrefix 资源是您的环境的存储分区的地址。

示例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

使用 google-auth 库获取凭据,并使用 requests 库调用 REST API。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

添加或更新 DAG

如需添加或更新 DAG,请将该 DAG 的 Python .py 文件移至环境存储分区的 /dags 文件夹中。

控制台

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中点击 DAG 链接。系统会打开存储分区详情页面。它会显示环境存储分区中 /dags 文件夹的内容。

  3. 点击上传文件。然后,使用浏览器对话框为此 DAG 选择 Python .py 文件并进行确认。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 文件。

示例:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

更新具有活跃 DAG 运行作业的 DAG

如果您更新具有活跃 DAG 运行作业的 DAG:

  • 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
  • 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
  • 更新后的 DAG 文件中不再存在的所有任务都会标记为已移除。

更新频繁运行的 DAG

上传 DAG 文件后,Airflow 需要一些时间才能加载此文件并更新该 DAG。如果您的 DAG 频繁运行,您可能需要确保该 DAG 使用新版 DAG 文件。为此,请执行以下操作:

  1. 在 Airflow 界面中暂停 DAG。
  2. 上传更新后的 DAG 文件。
  3. 请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。

    即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。

  4. 您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。

  5. (可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:

    1. 在 Google Cloud Console 中,打开您的环境的日志标签页。

    2. 转到 Composer 日志 > 基础架构 > Cloud Storage 同步项,然后检查环境中每个工作器的日志。查找时间戳在您上传新 DAG 文件之后的最新 Syncing dags directory 日志项。如果您看到后面跟随 Finished syncing 项,则相应 DAG 在此工作器上成功同步。

  6. 取消暂停 DAG。

删除 DAG

本部分介绍如何删除 DAG。

在您的环境中删除 DAG

如需删除 DAG,请从环境的存储分区中移除相应环境的 /dags 文件夹中的 DAG .py 文件。

控制台

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中点击 DAG 链接。系统会打开存储分区详情页面。它会显示环境存储分区中 /dags 文件夹的内容。

  3. 选择 DAG 文件,点击删除并确认操作。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_FILE 替换为 DAG 的 Python .py 文件。

示例:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

从 Airflow 界面中移除 DAG

如需从 Airflow 网页界面中移除 DAG 元数据,请执行以下操作:

Airflow 界面

  1. 转到您环境的 Airflow 界面
  2. 对于 DAG,点击删除 DAG

gcloud

在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_NAME 是要删除的 DAG 的名称。

后续步骤