Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教程将指导您诊断和排查导致调度程序故障、解析错误和延迟以及任务失败的任务调度和解析问题。
简介
Airflow 调度器主要受以下两个因素影响:任务调度和 DAG 解析。其中一个因素出现问题都可能会对环境健康和性能产生负面影响。
有时,系统会同时安排过多任务。在这种情况下,队列会被填满,任务会保持“已调度”状态,或在加入队列后重新调度,这可能会导致任务失败和性能延迟。
另一个常见问题是解析延迟时间和由 DAG 代码复杂性导致的错误。例如,如果 DAG 代码在代码顶层包含 Airflow 变量,则可能会导致解析延迟、数据库过载、调度失败和 DAG 超时。
在本教程中,您将诊断示例 DAG,并学习如何排查调度和解析问题、改进 DAG 调度,以及优化 DAG 代码和环境配置以提升性能。
目标
本部分列出了本教程中示例的目标。
示例:任务并发度较高导致调度程序故障和延迟
上传同时运行多次的示例 DAG,并使用 Cloud Monitoring 诊断调度程序故障和延迟问题。
通过合并任务来优化 DAG 代码,并评估性能影响。
随着时间推移,更平均地分配任务,并评估对性能的影响。
优化 Airflow 配置和环境配置,并评估影响。
示例:复杂代码导致的 DAG 解析错误和延迟时间
上传包含 Airflow 变量的示例 DAG,并使用 Cloud Monitoring 诊断解析问题。
通过避免在代码顶级使用 Airflow 变量来优化 DAG 代码,并评估对解析时间的影响。
优化 Airflow 配置和环境配置,并评估对解析时间的影响。
费用
本教程使用 Google Cloud的以下可计费组件:
完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理。
准备工作
本部分介绍了在开始本教程之前需要执行的操作。
创建和配置项目
在本教程中,您需要一个 Google Cloud 项目。按如下方式配置项目:
在 Google Cloud 控制台中,选择或创建项目:
确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能。
确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:
- Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute Admin (
roles/compute.admin
)
- Environment and Storage Object Administrator
(
为您的项目启用 API
Enable the Cloud Composer API.
创建 Cloud Composer 环境
在创建环境的过程中,您需要向 Composer Service Agent 账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。
示例:由于任务调度问题,调度器故障和任务失败
此示例演示了调试调度程序故障以及高任务并发导致的延迟。
将示例 DAG 上传到您的环境
将以下示例 DAG 上传到您在前面步骤中创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_200_seconds_1
。
此 DAG 包含 200 个任务。每个任务都会等待 1 秒钟,然后输出“Complete!”。DAG 在上传后会自动触发。Cloud Composer 会运行此 DAG 10 次,并且所有 DAG 运行都是并行进行的。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
诊断调度器故障和任务失败问题
DAG 运行完毕后,打开 Airflow 界面,然后点击 dag_10_tasks_200_seconds_1
DAG。您会看到,总共有 10 次 DAG 运行作业成功,并且每项作业都有 200 个成功的任务。
查看 Airflow 任务日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页,然后依次选择所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看。
在日志直方图中,您可以看到以红色和橙色表示的错误和警告:

示例 DAG 产生了大约 130 条警告和 60 条错误。点击包含黄色和红色条形的任意列。您会在日志中看到以下某些警告和错误:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
这些日志可能表明资源用量超出了限制,并且 worker 自行重启。
如果 Airflow 任务在队列中保留的时间过长,则调度器会将其标记为 failed 和 up_for_retry,并且将再次重新安排它以执行。观察这种情况的一种方法是查看显示已加入队列的任务数量的图表,如果此图表中的峰值在 10 分钟左右没有下降,则任务可能会失败(没有日志)。
查看监控信息:
前往监控标签页,然后选择概览。
查看 Airflow 任务图表。
图 2. Airflow 任务图表(点击可放大) Airflow 任务图表中,排队任务数量出现了持续超过 10 分钟的高峰,这可能表示您的环境中没有足够的资源来处理所有计划任务。
查看活跃工作器数图表:
图 3. 活跃工作器数图表(点击可放大) 活跃工作器图表表明,DAG 在运行期间触发了自动扩缩,使工作器数量达到了允许的上限(3 个工作器)。
资源使用情况图表可能会表明 Airflow 工作器没有足够的容量来运行队列中的任务。在 Monitoring(监控)标签页中,选择 Workers(工作器),然后查看 Total worker CPU usage(工作器 CPU 总用量)和 Total worker memory usage(工作器内存总用量)图表。
图 4. 工作器总 CPU 使用情况图表(点击可放大) 图 5. 工作器内存总用量图表(点击可放大) 图表表明,由于同时执行的任务过多,导致 CPU 用量达到上限。资源的使用时间已超过 30 分钟,这甚至比 10 个 DAG 依次运行的 200 个任务的总时长还要长。
这些指标表示队列已满,并且缺少资源来处理所有已安排的任务。
合并任务
当前代码会创建许多 DAG 和任务,但没有足够的资源来并行处理所有任务,这会导致队列被填满。在队列中保留任务的时间过长可能会导致任务重新安排或失败。在这种情况下,您应该选择使用较少的整合任务。
以下示例 DAG 将初始示例中的任务数量从 200 更改为 20,并将等待时间从 1 秒增加到 10 秒,以模拟执行相同工作量的更多合并任务。
将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_20_seconds_10
。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
评估更多合并的任务对调度流程的影响:
等待 DAG 运行完成。
在 Airflow 界面的 DAG 页面上,点击
dag_10_tasks_20_seconds_10
DAG。您会看到 10 个 DAG 运行作业,每个作业都有 20 个成功的任务。在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页,然后依次选择所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看。
第二个示例包含更多合并的任务,因此产生了大约 10 个警告和 7 个错误。在直方图中,您可以比较第一个示例(早期值)和第二个示例(后期值)中的错误和警告数量。
图 6. 任务合并后的 Airflow 工作器日志直方图(点击可放大) 将第一个示例与更为整合的示例进行比较时,您会发现第二个示例中的错误和警告明显更少。不过,由于资源过载,日志中仍会显示与热重启相关的相同错误。
在监控标签页中,选择工作器,然后查看图表。
将第一个示例(较早值)的 Airflow 任务图表与第二个示例(任务更集中)的图表进行比较时,您会发现,当任务更集中时,队列中任务的激增持续时间更短。不过,它持续了近 10 分钟,仍然不太理想。
图 7. 任务合并后的 Airflow 任务图表(点击可放大) 在“活跃工作器数”图表中,您可以看到第一个示例(图表左侧)使用资源的时间比第二个示例长得多,即使这两个示例模拟的工作量相同。
图 8. 任务合并后的活跃工作器数图表(点击可放大) 查看工作器资源消耗图表。尽管包含更多合并任务的示例与初始示例中使用的资源差异很大,但 CPU 使用率仍会飙升到上限的 70%。
图 9. 任务合并后的总工作器 CPU 使用情况图表(点击可放大) 图 10. 任务合并后的总工作器内存用量图表(点击可放大)
随着时间推移更平均地分配任务
并发任务过多会导致队列被填满,进而导致任务卡在队列中或重新调度。在前面的步骤中,您通过合并这些任务来减少任务数量,但输出日志和监控结果表明,并发任务的数量仍然不太理想。
您可以通过实现时间表或设置可以同时运行的任务数量限制来控制并发任务运行的数量。
在本教程中,您将向 dag_10_tasks_20_seconds_10
DAG 添加 DAG 级参数,以便随着时间推移更平均地分配任务:
向 DAG 上下文管理器添加
max_active_runs=1
参数。此参数会限制在给定时间内仅运行单个 DAG 实例。向 DAG 上下文管理器添加
max_active_tasks=5
参数。此参数用于控制每个 DAG 中可以并发运行的任务实例数上限。
将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_20_seconds_10_scheduled.py
。
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
评估随时间分布任务对调度进程的影响:
等待 DAG 运行完成。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页,然后依次选择所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看。
在直方图中,您可以看到,第三个 DAG 的有效任务和运行数量有限,未生成任何警告或错误,并且日志分布看起来比之前的值更均匀。
图 11. 任务随时间推移进行合并和分发后的 Airflow 工作器日志直方图(点击可放大)
dag_10_tasks_20_seconds_10_scheduled
示例中的任务具有有限的活跃任务数和运行次数,但由于任务均匀加入队列,因此并未造成资源压力。
执行上述步骤后,您通过合并小任务并在一段时间内更均匀地分布这些任务,优化了资源使用情况。
优化环境配置
您可以调整环境配置,确保 Airflow 工作器始终具有容量来运行已加入队列的任务。
工作器数量和工作器并发数
您可以调整工作器数量上限,让 Cloud Composer 在设定的限制内自动扩缩您的环境。
[celery]worker_concurrency
参数定义单个工作器可以从任务队列中获取的任务数量上限。更改此参数可调整单个工作器可以同时执行的任务量。您可以通过替换此 Airflow 配置选项来更改此选项。默认情况下,工作器并发数设置为以下值的下限:32, 12 * worker_CPU, 8 * worker_memory
,这意味着它取决于工作器资源限制。如需详细了解默认工作器并发值,请参阅优化环境。
工作器数量和工作器并发性相互作用,并且环境性能在很大程度上取决于这两个参数。您可以参考以下注意事项来选择正确的组合:
并行运行多个快速任务。当队列中有任务在等待,同时工作器在使用一小部分 CPU 和内存时,您可以提高工作器并发性。不过,在某些情况下,队列可能永远不会填满,导致自动扩缩功能永远不会触发。如果小任务在新工作器准备就绪之前完成执行,现有工作器可以接收剩余任务,而新创建的工作器将没有任务。
在这些情况下,建议增加工作器数量下限并提高工作器并发数量,以避免过度扩缩。
并行运行多个长时间运行的任务。较高的工作器并发性会阻止系统扩缩工作器数量。如果多个任务需要大量资源且需要很长时间才能完成,那么工作器并发性较高可能会导致队列永远不会填满,并且所有任务都由一个工作器处理,从而导致性能问题。在这些情况下,建议增加工作器数量上限并降低工作器并发数量。
并行的重要性
Airflow 调度器控制 DAG 运行和 DAG 中各项任务的时间安排。[core]parallelism
Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。
并行性是 Airflow 的一种保护机制,用于确定每个调度器可以同时运行多少任务,而不管工作器数量如何。并行处理值乘以集群中的调度器数量,就是您的环境可以加入队列的任务实例的数量上限。
通常,[core]parallelism
会设置为工作器数量上限和 [celery]worker_concurrency
的乘积。它还会受到广告资源池的影响。您可以通过替换此 Airflow 配置选项来更改此选项。如需详细了解如何调整与扩缩相关的 Airflow 配置,请参阅扩缩 Airflow 配置。
查找最佳环境配置
建议您将小任务合并为更大的任务,并随着时间推移更均匀地分配任务,以解决调度问题。除了优化 DAG 代码之外,您还可以优化环境配置,以便有足够的容量来并行运行多个任务。
例如,假设您尽可能在 DAG 中合并任务,但限制活跃任务以使其在时间上更均匀地分布,对您的特定用例而言并不是首选解决方案。
您可以调整并行性、工作器数量和工作器并发性参数,以运行 dag_10_tasks_20_seconds_10
DAG,而不会限制正在执行的任务。在此示例中,DAG 会运行 10 次,每次运行包含 20 个小任务。如果您想同时运行所有测试,请执行以下操作:
您需要更大的环境大小,因为它用于控制环境的代管式 Cloud Composer 基础架构的性能参数。
Airflow 工作器必须能够同时运行 20 个任务,这意味着您需要将工作器并发设置为 20。
工作器需要足够的 CPU 和内存才能处理所有任务。工作器并发性会受到工作器 CPU 和内存的影响,因此您需要至少有
worker_concurrency / 12
个 CPU 和least worker_concurrency / 8
内存。您需要提高并行性,以匹配更高的并发工作器数量。为了让工作器从队列中提取 20 个任务,调度程序需要先调度这 20 个任务。
请按以下方式调整环境配置:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
找到资源 > 工作负载配置,然后点击修改。
在工作器部分的内存字段中,为 Airflow 工作器指定新的内存限制。在本教程中,请使用 4 GB。
在 CPU 字段中,为 Airflow 工作器指定新的 CPU 限制。在本教程中,请使用 2 个 vCPU。
保存更改,并等待几分钟,让 Airflow 工作器重启。
接下来,替换并行性和工作器并发性 Airflow 配置选项:
前往 Airflow 配置替换标签页。
点击修改,然后点击添加 Airflow 配置替换。
替换并行处理配置:
部分 键 值 core
parallelism
20
点击添加 Airflow 配置替换,然后替换 worker 并发配置:
部分 键 值 celery
worker_concurrency
20
点击保存,然后等待环境更新其配置。
使用调整后的配置再次触发同一示例 DAG:
在 Airflow 界面中,前往 DAG 页面。
找到
dag_10_tasks_20_seconds_10
DAG 并将其删除。删除 DAG 后,Airflow 会检查环境存储分区中的 DAG 文件夹,并自动再次运行 DAG。
DAG 运行完成后,请再次查看“日志”直方图。从图表中,您可以看到,使用调整后的环境配置运行包含更多合并任务的 dag_10_tasks_20_seconds_10
示例时,未生成任何错误和警告。将结果与图表中的前面数据进行比较,在默认环境配置下运行时,同一示例会生成错误和警告。

环境配置和 Airflow 配置在任务调度中发挥着至关重要的作用,但无法超出特定限制来增加配置。
我们建议优化 DAG 代码、合并任务,并使用调度功能来优化性能和效率。
示例:由于 DAG 代码复杂而导致 DAG 解析错误和延迟
在本例中,您将调查模拟 Airflow 变量过多的示例 DAG 的解析延迟时间。
创建新的 Airflow 变量
在上传示例代码之前,请创建一个新的 Airflow 变量。
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
依次前往管理 > 变量 > 添加新记录。
设置以下值:
- 键:
example_var
- val:
test_airflow_variable
- 键:
将示例 DAG 上传到您的环境
将以下示例 DAG 上传到您在前面步骤中创建的环境。在本教程中,此 DAG 名为 dag_for_loop_airflow_variable
。
此 DAG 包含一个运行 1,000 次的 for 循环,模拟了过多的 Airflow 变量。每次迭代都会读取 example_var
变量并生成一个任务。每个任务都包含一个用于输出变量值的命令。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
诊断解析问题
DAG 解析时间是 Airflow 调度器读取和解析 DAG 文件所需的时间。在 Airflow 调度器可以安排 DAG 中的任何任务之前,调度器必须解析 DAG 文件,以发现 DAG 的结构和定义的任务。
如果 DAG 需要很长时间才能解析,这会占用调度器的容量,并且可能会降低 DAG 运行的性能。
如需监控 DAG 解析时间,请执行以下操作:
在 gcloud CLI 中运行
dags report
Airflow CLI 命令,查看所有 DAG 的解析时间:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。
在命令的输出中,查找
dag_for_loop_airflow_variables
DAG 的时长值。如果值较大,则可能表示此 DAG 未以最佳方式实施。如果您有多个 DAG,则可以从输出表中确定哪些 DAG 的解析时间较长。示例:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
在 Google Cloud 控制台中检查 DAG 解析时间:
- 在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往 Logs(日志)标签页,然后依次选择 All logs(所有日志)> DAG processor manager(DAG 处理器管理器)。
查看
dag-processor-manager
日志并确定可能存在的问题。图 13. DAG 处理器管理器日志显示 DAG 解析时间(点击可放大)
如果 DAG 总解析时间超过约 10 秒,则调度器可能因 DAG 解析而过载,因而无法有效运行 DAG。
优化 DAG 代码
建议避免在 DAG 中使用不必要的“顶级”Python 代码。如果 DAG 包含许多外部导入、变量和函数,则会导致 Airflow 调度器的解析时间更长。这会降低 Cloud Composer 和 Airflow 的性能和可伸缩性。过多读取 Airflow 变量会导致解析时间过长和数据库负载过高。如果此代码位于 DAG 文件中,则这些函数会在每次调度程序心跳时执行,这可能会很慢。
借助 Airflow 的模板字段,您可以将 Airflow 变量和 Jinja 模板中的值纳入 DAG。这可防止在调度程序心跳期间执行不必要的函数。
如需更好地实现 DAG 示例,请避免在 DAG 的顶级 Python 代码中使用 Airflow 变量。而是通过 Jinja 模板将 Airflow 变量传递给现有运算符,这样系统会延迟读取值,直到任务执行时再读取。
将新版示例 DAG 上传到您的环境。在本教程中,此 DAG 名为 dag_for_loop_airflow_variable_optimized
。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
检查新的 DAG 解析时间:
等待 DAG 运行完成。
再次运行
dags report
命令,查看所有 DAG 的解析时间:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
再次查看
dag-processor-manager
日志,并分析解析时长。图 14. DAG 处理器管理器日志显示 DAG 代码优化后的 DAG 解析时间(点击可放大)
通过将环境变量替换为 Airflow 模板,您简化了 DAG 代码,并将解析延迟时间缩短了约 10 倍。
优化 Airflow 环境配置
Airflow 调度器会不断尝试触发新任务并解析环境存储分区中的所有 DAG。如果 DAG 的解析时间很长,并且调度器会消耗大量资源,您可以优化 Airflow 调度器配置,以便调度器更高效地使用资源。
在本教程中,DAG 文件需要花费很长时间才能解析,解析周期开始重叠,然后耗尽调度器的容量。在我们的示例中,第一个示例 DAG 需要超过 5 秒才能解析,因此您需要将调度程序配置为运行频率更低,以便更高效地使用资源。您将替换 scheduler_heartbeat_sec
Airflow 配置选项。此配置定义了调度程序应运行的频率(以秒为单位)。默认情况下,该值设置为 5 秒。
您可以通过替换此 Airflow 配置选项来更改此选项。
替换 scheduler_heartbeat_sec
Airflow 配置选项:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往 Airflow 配置替换标签页。
点击修改,然后点击添加 Airflow 配置替换。
替换 Airflow 配置选项:
部分 键 值 scheduler
scheduler_heartbeat_sec
10
点击保存,然后等待环境更新其配置。
检查调度器指标:
前往 Monitoring(监控)标签页,然后选择 Schedulers(调度程序)。
在调度程序心跳图表中,点击更多选项按钮(三点状图标),然后点击在 Metrics Explorer 中查看。

在图表中,您会看到将默认配置从 5 秒更改为 10 秒后,调度程序的运行频率降低了两倍。通过降低心跳频率,您可以确保在之前的解析周期进行且调度程序的资源容量未耗尽时,调度程序不会开始运行。
为调度程序分配更多资源
在 Cloud Composer 2 中,您可以向调度程序分配更多 CPU 和内存资源。这样,您可以提高调度器的性能并缩短 DAG 的解析时间。
向调度器分配额外的 CPU 和内存:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
找到资源 > 工作负载配置,然后点击修改。
在调度程序部分的内存字段中,指定新的内存限制。在本教程中,请使用 4 GB。
在 CPU 字段中,指定新的 CPU 上限。在本教程中,请使用 2 个 vCPU。
保存更改,并等待几分钟,让 Airflow 调度器重启。
前往 Logs(日志)标签页,然后依次选择 All logs(所有日志)> DAG processor manager(DAG 处理器管理器)。
查看
dag-processor-manager
日志,并比较示例 DAG 的解析时长:图 16. DAG 处理器管理器日志显示向调度器分配更多资源后的 DAG 解析时间(点击可放大)
与默认环境配置相比,通过向调度程序分配更多资源,您提高了调度程序的容量并显著缩短了解析延迟时间。资源越多,调度程序解析 DAG 的速度就越快,但与 Cloud Composer 资源相关的费用也会增加。此外,资源数量无法超出一定的限制。
我们建议您仅在实现可能的 DAG 代码和 Airflow 配置优化后再分配资源。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
删除各个资源
如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。
删除 Cloud Composer 环境。在此过程中,您还会删除环境的存储分区。