Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页面介绍什么是 Airflow 向后移植提供程序软件包,以及如何在 DAG 中使用它们。
向后移植提供程序软件包是 Airflow 2 版本的运算符、转移、传感器、钩子和密钥,这些将打包为 PyPI 模块。
为了方便介绍,本文档仅重点介绍运算符。您可以像使用运算符一样,使用向后移植软件包中的其他所有内容。
向后移植提供程序软件包解决了一个重要问题:您可以获取运算符、转移、传感器和钩子的新特性和安全更新,而无需将 Airflow 环境升级到更高版本。例如,Airflow 2 中提供了 Google 提供程序软件包。您可以从 Airflow 1.10.* 环境中此软件包的向后移植版本导入运算符。
在 Apache Airflow 引入向后移植提供程序软件包的概念之前,运算符是 Airflow 不可或缺的一部分。为了获取新的运算符版本,您需要更新版本的 Airflow。这需要将 Cloud Composer 环境升级到更高版本。Airflow 2 逐步弃用此模型,并引入提供程序软件包。Airflow 2 的提供程序软件包是一个 PyPI 模块,包含特定提供程序(例如 Google)的运算符、转移、传感器、钩子和密钥。如果您使用的是 Airflow 1.10.*,则可以从这项变更中受益,并使用向后移植的提供程序软件包版本。
使用预安装的向后移植软件包
Cloud Composer 映像中已安装一些向后移植软件包。您不需要将这些向后移植软件包安装到您的环境中。只需确保在 DAG 代码中从向后移植软件包导入运算符,具体如本页面后面部分所述。
如需查看您的环境中有哪些向后移植软件包,请参阅您的环境的 Cloud Composer 映像中的软件包列表。
您的环境中每个预安装的向后移植软件包都有特定版本。如果要使用其他版本,请更新环境并指定所需版本。我们不建议降级预安装的向后移植软件包。仅当您发现更新版本存在问题时,才需要安装早期版本。您无法卸载预安装的向后移植软件包,只能更改安装的版本。
安装和升级向后移植软件包
如需安装或升级反向移植软件包,请执行以下操作:
像任何其他 PyPI 软件包一样安装或升级软件包。
如果需要,请安装跨提供商软件包的依赖项。这些是使用向后移植软件包的所有功能可能需要的额外依赖项。
例如,要使用
apache-airflow-backport-providers-google
中的SalesforceToGcsOperator
,则需要[salesforce]
extra。安装apache-airflow-backport-providers-google
,并在 Extras 参数和版本字段中指定[salesforce]
extra。
在 DAG 中从向后移植提供程序软件包导入运算符
如需查看向后移植软件包中的内容列表,请转到 PyPI.org 上的向后移植软件包页面。例如,
apache-airflow-backport-providers-google
列出该软件包的操作符、传输、传感器、钩子和密钥。
向后移植提供程序软件包引入了新的和已移动的运算符以及其他内容。这两种内容存在差异,稍后会加以说明。
导入新运算符
新运算符是指 Airflow 1.10.* 中不存在的运算符。如果您尝试在没有此类向后移植软件包的情况下导入此类运算符,则会收到导入错误。
如需使用向后移植软件包中的新运算符,请将其从对应的 airflow.providers.*
软件包中导入,如 PyPI.org 上此向后软件包软件包的页面所述。
以下示例从
apache-airflow-backport-providers-google
软件包:
from airflow.providers.google.cloud.operators.bigquery_dts import (
BigQueryCreateDataTransferOperator,
BigQueryDeleteDataTransferConfigOperator,
)
导入已移动的运算符
已移动的运算符是指 Airflow 1.10.* 中已经存在的运算符。安装向后移植运算符软件包后,您可以导入一个运算符的两个不同版本。一个版本与 Airflow 捆绑在一起,另一个版本是已移动的运算符。如需使用已移动的运算符,请使用新的导入路径将其导入。
如需使用向后移植软件包中已移动的运算符,请将其从对应的 airflow.contrib.*
软件包中导入,如 PyPI.org 上此向后软件包软件包的页面所述。
以下示例从 apache-airflow-backport-providers-google
软件包中导入已移动的运算符:
from airflow.contrib.operators.bigquery_operator import (
BigQueryCreateEmptyDatasetOperator,
BigQueryOperator,
)