Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页面介绍了如何在 Cloud Composer 中管理 DAG 环境
Cloud Composer 使用 Cloud Storage 存储分区来存储 Cloud Composer 环境的 DAG。您的环境会将 DAG 从此存储桶同步到 Airflow 组件,例如 Airflow 工作器和调度器。
准备工作
- 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG。
- 确保您的账号拥有足够的权限, 管理 DAG。
- 对 DAG 的更改会在 3-5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。
访问您的环境的存储桶
如需访问与您的环境关联的存储桶,请执行以下操作:
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。通过 存储分区详情页面即会打开。它会显示环境存储桶中
/dags
文件夹的内容。
gcloud
gcloud CLI 通过单独的命令 在环境的存储桶中添加和删除 DAG。
如果您想与环境的存储桶进行交互,还可以使用 使用 Google Cloud CLI。如需获取环境存储桶的地址,请运行以下 gcloud CLI 命令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
您需要将其中的:
ENVIRONMENT_NAME
替换为环境的名称。- 将
LOCATION
替换为环境所在的区域。
例如:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
构建 environments.get
API 请求。在
Environment 资源,在
EnvironmentConfig 资源,位于
dagGcsPrefix
资源是您环境的存储桶的地址。
示例:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
使用 google-auth 库 获取凭据并使用 requests 库 调用 REST API。
添加或更新 DAG
如需添加或更新 DAG,请将 DAG 的 Python .py
文件移至
环境存储桶中的 /dags
文件夹。
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。通过 存储分区详情页面即会打开。它会显示环境存储桶中
/dags
文件夹的内容。点击上传文件。然后,为 DAG 选择 Python
.py
文件 使用浏览器的对话框确认即可。
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
替换:
ENVIRONMENT_NAME
替换为环境的名称。- 将
LOCATION
替换为环境所在的区域。 LOCAL_FILE_TO_UPLOAD
是 DAG 的 Python.py
文件。
示例:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
更新具有活跃 DAG 运行作业的 DAG
如果您更新具有活跃 DAG 运行作业的 DAG:
- 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
- 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
- 更新后的 DAG 文件中不再存在的所有任务均标记为 已移除。
更新频繁运行的 DAG
上传 DAG 文件后,Airflow 需要一些时间来加载此文件并更新此 DAG。如果您的 DAG 频繁运行,建议您进行如下设置: 确保 DAG 使用更新后的 DAG 文件版本。为此,请执行以下操作:
在 Airflow 界面中暂停 DAG。
上传更新后的 DAG 文件。
请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。
即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。
您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。
(可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:
在 Google Cloud 控制台中打开环境的日志标签页。
转到 Composer 日志 > 基础架构 > Cloud Storage 同步项并检查以下集群中每个工作器的日志 您的环境查找最新的
Syncing dags directory
日志 时间戳在您上传新 DAG 文件之后的条目。如果您 其后面的Finished syncing
项,则 DAG 已成功在此工作器上同步。
取消暂停 DAG。
在您的环境中删除 DAG
如需删除 DAG,请从以下位置移除 DAG 的 Python .py
文件:
环境的存储桶中,创建环境的 /dags
文件夹。
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,找到包含您的环境名称的行 在 DAGs 文件夹列中,点击 DAGs 链接。存储桶详情页面随即打开。它会显示环境存储桶中
/dags
文件夹的内容。选择 DAG 文件,点击删除,然后确认操作。
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
替换:
ENVIRONMENT_NAME
替换为环境的名称。- 将
LOCATION
替换为环境所在的区域。 - 将
DAG_FILE
替换为 DAG 的 Python.py
文件。
示例:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
从 Airflow 界面中移除 DAG
如需从 Airflow 网页界面中移除 DAG 元数据,请执行以下操作:
Airflow 界面
- 前往您的环境的 Airflow 界面。
- 对于 DAG,点击删除 DAG。
gcloud
在早于 1.14.0 的 Airflow 1 版本中,运行以下命令: gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
在 Airflow 2、Airflow 1.14.0 及更高版本中,运行以下命令: gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
您需要将其中的:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。DAG_NAME
是要删除的 DAG 的名称。