Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍了如何在环境中启用对延迟操作符的支持,以及如何在 DAG 中使用延迟操作符。 Google Cloud
Cloud Composer 中的可推迟运算符简介
如果您有至少一个触发器实例(在高度弹性环境中至少有两个),则可以在 DAG 中使用可延期运算符和触发器。
对于可推迟的运算符,Airflow 会将任务执行分为以下阶段:
开始操作。在此阶段,任务会占用一个 Airflow 工作器槽位。该任务会执行将作业委托给其他服务的操作。
例如,运行 BigQuery 作业可能需要几秒钟到几小时。创建作业后,该操作会将工作标识符(BigQuery 作业 ID)传递给 Airflow 触发器。
触发器会监控作业,直到其完成。在此阶段,Worker 槽未占用。Airflow 触发器采用异步架构,能够处理数百个此类作业。当触发器检测到作业已完成时,它会发送一个事件来触发最后一个阶段。
在最后一个阶段,Airflow 工作器会执行回调。例如,此回调可以将任务标记为成功,或执行其他操作,并将作业设置为再次由触发器监控。
触发器是无状态的,因此能够抵御中断或重启。因此,长时间运行的作业对 pod 重启具有弹性,除非重启发生在预计时间较短的最后阶段。
准备工作
- 可延迟操作符和传感器在 Cloud Composer 2 环境中可用,并且需要满足以下条件:
- Cloud Composer 2.0.31 及更高版本
- Airflow 2.2.5、2.3.3 及更高版本
启用对延迟操作符的支持
一个名为 Airflow 触发器的环境组件会异步监控环境中的所有推迟任务。此类任务的延迟操作完成后,触发器会将任务传递给 Airflow 工作器。
您需要在环境中至少有一个触发器实例(在高度弹性环境中至少有两个),才能在 DAG 中使用可延迟模式。您可以在创建环境时配置触发器,也可以调整现有环境的触发器数量和性能参数。
支持延迟模式的Google Cloud 运算符
只有部分 Airflow 运算符已扩展为支持可推迟的模型。以下列表是对 airflow.providers.google.operators.cloud
软件包中支持延迟模式的运算符的参考。列出所需最低 airflow.providers.google.operators.cloud
软件包版本的列表示该运营商支持可推迟模式的最早软件包版本。
Cloud Composer 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
BigQuery 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
BigQuery Data Transfer Service 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
Cloud Build 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Cloud SQL 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Dataflow 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
Cloud Data Fusion 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataproc 运算符
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine 操作器
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
AI Platform 运营商
运营商名称 | 所需的 apache-airflow-providers-google 版本 |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
在 DAG 中使用可延期运算符
所有 Google Cloud 运算符的通用惯例是使用 deferrable
布尔值参数启用可延迟模式。如果 Google Cloud运算符没有此参数,则无法在可推迟模式下运行。其他运算符可能采用不同的惯例。例如,某些社区运营商具有名称中带有 Async
后缀的单独类。
以下示例 DAG 在延迟模式下使用 DataprocSubmitJobOperator
运算符:
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
查看触发器日志
触发器会生成日志,这些日志与其他环境组件的日志一起提供。如需详细了解如何查看环境日志,请参阅查看日志。
监视器触发器
如需详细了解如何监控触发器组件,请参阅 Airflow 指标。
除了监控触发器之外,您还可以在环境的 Monitoring 信息中心的未完成的任务指标中查看推迟的任务数量。