Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页面介绍如何将 DAG,数据和配置从现有 Airflow 1.10.* 环境转移到具有 Airflow 2 及更高版本的 Airflow 环境。
其他迁移指南
从 | 收件人 | 方法 | 指南 |
---|---|---|---|
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 并排显示,使用快照 | 迁移指南(快照) |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 并排,使用快照 | 迁移指南(快照) |
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 并排手动传输 | 手动迁移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 并排手动转移 | 手动迁移指南 |
Airflow 1 | Airflow 2 | 并排手动转移 | 本指南(手动迁移) |
并列升级
通过 Cloud Composer 提供的 Cloud Composer 数据库转移脚本,您可以将元数据数据库、DAG、数据和插件从使用 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 环境迁移至使用 Airflow 2.0.1 及更高版本的现有 Cloud Composer 环境。
这是本指南中介绍的路径的替代路径。在使用提供的脚本时,本指南的某些部分仍然适用。例如,您可能希望在迁移前检查 DAG 是否兼容 Airflow 2,或者确保并发 DAG 运行不会发生,且没有额外或缺失的 DAG 运行。
准备工作
在开始将 Cloud Composer 环境与 Airflow 2 结合使用之前,请考虑 Airflow 2 为 Cloud Composer 环境所做的更改。
调度器高可用性
您可以在您的环境中使用多个 Airflow 调度器。您可以在创建环境时设置调度器的数量,或者通过更新现有环境进行更改。
Celery+Kubernetes Executor
Airflow 2 Celery+Kubernetes 执行器 支持 Cloud Composer 3。
重大更改
Airflow 2 引入了许多重大更改,其中一些更改重大:
- Airflow 1.10.* 中的现有 DAG 无法保证与 Airflow 2 搭配使用。它们需要进行测试并可能需要调整。
- 迁移到提供商软件包的操作、转移和钩子。DAG 中的导入语句必须使用新的提供程序软件包。旧版 import 语句可能不适用于 Airflow 2。
- 某些 Airflow 1.10.* 配置可能不再受支持,因为 Airflow 2 不再支持某些配置选项。
- 某些自定义 PyPI 软件包可能与新版本 Airflow 或 Python 不兼容。
- 带有访问权限控制的 Airflow 界面是默认的 Airflow 2 界面。Airflow 2 不支持其他 Airflow 界面类型。
- 实验性 REST API 已替换为稳定版 Airflow API。在 Airflow 2 中,实验性 REST API 默认处于停用状态。
- Airflow 2.0.0 中的其他重大更改
- Airflow 2.0.1 中的其他重大变更
Airflow 2 和 Airflow 1.10 的环境之间存在差异.*
使用 Airflow 1.10.* 的 Cloud Composer 环境与使用 Airflow 2 的环境之间的主要区别:
- 采用 Airflow 2 的环境使用 Python 3.8。此版本比 Airflow 1.10.* 环境中使用的版本更新。不支持 Python 2、Python 3.6 和 Python 3.7。
- Airflow 2 使用不同的 CLI 格式。在使用 Airflow 2 的环境中,Cloud Composer 支持通过
gcloud composer environments run
命令使用新格式。 - 在 Airflow 2 环境中,预安装的 PyPI 软件包有所不同。如需预安装 PyPI 软件包的列表,请参阅 Cloud Composer 版本列表。
- Airflow 2 中始终启用 DAG 序列化。因此,不再需要异步 DAG 加载,并且 Airflow 2 也不支持加载异步 DAG。因此,Airflow 2 不支持配置
[core]store_serialized_dags
和[core]store_dag_code
参数,尝试设置这些参数的操作会报告为错误。 - 不支持 Airflow Web 服务器插件。这不会影响调度器或工作器插件,包括 Airflow 运算符和传感器。
- 在 Airflow 2 环境中,默认 Airflow 用户角色为
Op
。对于具有 Airflow 1.10.* 的环境,默认角色为Admin
。
第 1 步:检查与 Airflow 2 的兼容性
如需检查与 Airflow 2 是否存在潜在冲突,请参阅“升级到 Airflow 2.0+ 指南中的 升级 DAG。
您可能会遇到的一个常见问题与导入路径不兼容。如需详细了解如何解决此兼容性问题,请参阅 升级到 Airflow 2.0+ 指南,请参阅 关于向后移植提供程序的部分。
第 2 步:创建 Airflow 2 环境,转移配置替换和环境变量
创建 Airflow 2 环境并转移配置替换和环境变量:
按照创建环境中的步骤操作。在创建环境之前,还要指定配置替换和环境变量,如下文所述。
选择映像时,请选择具有 Airflow 2 的映像。
手动将配置参数从 Airflow 1.10.* 环境转移到新的 Airflow 2 环境。
控制台
创建环境时,展开网络、Airflow 配置替换及其他功能部分。
在 Airflow 配置替换下方,点击添加 Airflow 配置替换。
从 Airflow 1.10.* 环境中复制所有配置替换。
某些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改。
在环境变量下,点击添加环境变量
从 Airflow 1.10.* 环境中复制所有环境变量。
点击创建以创建环境。
第 3 步:将 PyPI 软件包安装到 Airflow 2 环境
创建 Airflow 2 环境后,在其中安装 PyPI 软件包:
控制台
在 Google Cloud 控制台中,前往环境页面。
选择您的 Airflow 2 环境。
转到 PyPI 软件包标签页,然后点击修改。
从 Airflow 1.10.* 环境中复制 PyPI 软件包要求。点击保存,然后等待您的环境更新。
由于 Airflow 2 环境使用了一组不同的预装软件包和不同的 Python 版本,因此可能会遇到 PyPI 软件包冲突,难以解决。如需诊断软件包依赖项问题,一种方法是在 Airflow 工作器 pod 中安装软件包来检查 PyPI 软件包错误。
第 4 步:将变量和池转移到 Airflow 2
Airflow 1.10.* 支持将变量和池导出到 JSON 文件。然后,您可以将这些文件导入您的 Airflow 2 环境。
您只有在拥有 default_pool
以外的自定义池时才需要转移池。否则,请跳过导出和导入池的命令。
gcloud
从 Airflow 1.10.* 环境中导出变量:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
替换:
- 将
AIRFLOW_1_ENV
替换为您的 Airflow 1.10.* 环境名称。 AIRFLOW_1_LOCATION
替换为环境所在的区域。
- 将
从 Airflow 1.10.* 环境中导出池:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
获取 Airflow 2 环境存储分区 URI。
运行以下命令:
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(config.dagGcsPrefix)"
替换:
- 将
AIRFLOW_2_ENV
替换为您的 Airflow 2 环境名称。 - 将
AIRFLOW_2_LOCATION
替换为环境所在的区域。
- 将
在输出中,移除
/dags
文件夹。结果是 Airflow 2 环境存储桶的 URI。例如,将
gs://us-central1-example-916807e1-bucket/dags
更改为gs://us-central1-example-916807e1-bucket
。
将包含变量和池的 JSON 文件转移到您的 Airflow 2 环境:
gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=variables.json gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=pools.json
将
AIRFLOW_2_BUCKET
替换为在上一步中获得的 Airflow 2 环境存储分区的 URI。将变量和池导入 Airflow 2:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
检查已导入的变量和池:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables list gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools list
从存储分区中移除 JSON 文件:
gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
第 5 步:从 Airflow 1.10.* 环境存储分区转移其他数据
gcloud
将插件转移到您的 Airflow 2 环境。为此,请将插件从 Airflow 1.10.* 环境存储分区导出到 Airflow 2 环境存储分区的
/plugins
文件夹:gcloud composer environments storage plugins export \ --destination=AIRFLOW_2_BUCKET/plugins \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
检查
/plugins
文件夹是否已成功导入:gcloud composer environments storage plugins list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
将
/data
文件夹从 Airflow 1.10.* 环境导出到 Airflow 2 环境:gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
检查
/data
文件夹是否已成功导入:gcloud composer environments storage data list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
第 6 步:转移连接和用户
Airflow 1.10.* 不支持导出用户和连接。如需转移用户和连接,请在 Airflow 2 环境中手动创建新的用户账号和连接。
gcloud
如需获取 Airflow 1.10.* 环境中的连接列表,请运行以下命令:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ connections -- --list
如需在 Airflow 2 环境中创建新连接,请通过 gcloud 运行
connections
Airflow CLI 命令。例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
如需查看 Airflow 1.10.* 环境中的用户列表,请执行以下操作:
为 Airflow 1.10.* 环境打开 Airflow 网页界面。
转到管理 > 用户。
要在 Airflow 2 环境中创建新的用户账号,请运行
users create
Airflow CLI 命令 通过 gcloud例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Admin
第 7 步:确保您的 DAG 已准备好使用 Airflow 2
在将 DAG 转移到 Airflow 2 环境之前,请确保:
您的 DAG 已成功运行,没有剩余的兼容性问题。
您的 DAG 会使用正确的导入语句。
例如,
BigQueryCreateDataTransferOperator
的新导入语句可能如下所示:from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。
第 8 步:将 DAG 转移到 Airflow 2 环境
在环境之间转移 DAG 时,可能会出现以下问题:
如果两个环境中都启用了(未暂停)某个 DAG,则每个环境都会安排自己的 DAG 副本。这可能会导致在相同的数据和执行时间内并发运行 DAG。
由于 DAG 同步,Airflow 从 DAG 中指定的开始日期开始安排额外的 DAG 运行。这是因为新的 Airflow 实例不考虑从 1.10.* 环境运行的 DAG 的历史记录。这可能会导致从指定的开始日期开始安排大量 DAG 运行。
防止并发 DAG 运行
在您的 Airflow 2 环境中,替换 dags_are_paused_at_creation
Airflow 配置选项。进行此项更改后,默认情况下,所有新 DAG 都会暂停。
部分 | 键 | 值 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
防止额外运行或缺失的 DAG 运行
在要转移到 Airflow 2 环境的 DAG 中,指定新的静态开始日期。
为避免执行日期出现间断和重叠,第一次 DAG 运行应在下一个安排时间间隔的 Airflow 2 环境中进行。为此,请将 DAG 中新的开始日期设置为早于 Airflow 1.10.* 环境中最后一次运行的日期。
例如,如果您在 Airflow 1.10.* 环境中的每天 15:00、17:00 和 21:00 运行 DAG,则上次 DAG 运行发生在 15:00,并且您计划于 15:15 转移此 DAG,那么 Airflow 2 环境的开始日期就是今天的 14:45。在 Airflow 2 环境中启用 DAG 后,Airflow 会安排在 17:00 运行 DAG。
另一个示例是,如果您的 DAG 在 Airflow 1.10.* 环境中的每一天 00:00 运行,则上次 DAG 运行发生在 2021 年 4 月 26 日 00:00,并且您计划在 2021 年 4 月 26 日 13:00 转移该 DAG,Airflow 2 环境的开始日期是 2021 年 4 月 25 日 23:45。在 Airflow 2 环境中启用 DAG 后,Airflow 将安排在 2021 年 4 月 27 日 00:00 运行 DAG。
将 DAG 逐个转移到 Airflow 2 环境
对于每个 DAG,请按照以下步骤转移它:
确保按照上一部分中的说明设置 DAG 中的新开始日期。
将更新的 DAG 上传到 Airflow 2 环境。由于配置替换,此 DAG 在 Airflow 2 环境中暂停,因此尚未安排任何 DAG 运行。
在 Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。
当您计划转移 DAG 时,请执行以下操作:
暂停 Airflow 1.10.* 环境中的 DAG。
取消暂停 Airflow 2 环境中的 DAG。
检查新的 DAG 运行是否安排在正确的时间。
等待 DAG 运行在 Airflow 2 环境中发生,并检查运行是否成功。
根据 DAG 是否成功运行:
如果 DAG 运行成功,您可以继续在 Airflow 2 环境中使用 DAG。最后,考虑删除 Airflow 1.10.* 版本的 DAG。
如果 DAG 运行失败,请尝试排查 DAG 问题,直到它在 Airflow 2 中成功运行为止。
如果需要,您始终可以回退到 Airflow 1.10.* 版本的 DAG:
暂停 Airflow 2 环境中的 DAG。
在 Airflow 1.10.* 环境中恢复暂停的 DAG。这会为新的 DAG 运行安排与失败的 DAG 运行相同的日期和时间。
当您准备好继续使用 Airflow 2 版本的 DAG 时,请调整开始日期,将新版本的 DAG 上传到 Airflow 2 环境,然后重复上述过程。
第 9 步:监控 Airflow 2 环境
将所有 DAG 和配置转移到 Airflow 2 环境后,请监控其是否存在潜在问题、DAG 运行失败以及环境整体运行状况。如果 Airflow 2 环境在足够长的时间内正常运行,则可以移除 Airflow 1.10.* 环境。