Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页面介绍如何将 DAG、数据和配置从现有 Cloud Composer 1、Airflow 1 环境转移到 Cloud Composer 2、Airflow 2。
其他迁移指南
从 | 收件人 | 方法 | 指南 |
---|---|---|---|
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 并排显示,使用快照 | 迁移指南(快照) |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 并排显示,使用快照 | 迁移指南(快照) |
Cloud Composer 1、Airflow 2 | Cloud Composer 2、Airflow 2 | 并排手动传输 | 手动迁移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 并排手动传输 | 本指南(手动迁移) |
Airflow 1 | Airflow 2 | 并排手动传输 | 手动迁移指南 |
准备工作
- Cloud Composer 支持从 Cloud Composer 1 并行迁移到 Cloud Composer 2。无法从 Cloud Composer 1 就地升级到 Cloud Composer 2。
- 请查看 Cloud Composer 1 和 Cloud Composer 2。
由于 Cloud Composer 2 使用 Airflow 2,因此迁移包括将 DAG 和环境配置切换为 Airflow 2。如需了解 Cloud Composer 的 Airflow 1 和 Airflow 2 之间的重大更改,请参阅从 Airflow 1 到 Airflow 2 的迁移指南。
在本指南中,您将迁移到 Airflow 2 和迁移到 Cloud Composer 2 合并到一个迁移过程中。通过这种方式,您无需在迁移到 Cloud Composer 2 之前使用 Airflow 2 迁移到 Cloud Composer 1 环境。
第 1 步:升级到 Airflow 1.10.15
如果环境使用低于 1.10.15 的 Airflow 版本,请将环境升级到使用 Airflow 1.10.15 的 Cloud Composer 版本。
第 2 步:检查与 Airflow 2 的兼容性
如需检查与 Airflow 2 是否存在潜在冲突,请参阅“升级到 Airflow 2.0+ 指南中的 升级 DAG。
您可能会遇到的一个常见问题与不兼容的导入有关 路径。如需详细了解如何解决此兼容性问题,请参阅 升级到 Airflow 2.0+ 指南,请参阅 关于向后移植提供程序的部分。
第 3 步:获取配置替换、自定义 PyPI 软件包和环境变量的列表
控制台
获取 Cloud Composer 1 环境的配置替换、自定义 PyPI 软件包和环境变量的列表:
前往 Google Cloud 控制台中的环境页面:
选择 Cloud Composer 1 环境。
在环境变量标签页上查看环境变量。
在 Airflow 配置替换标签页上查看配置替换。
在 PyPI 软件包标签页中查看自定义 PyPI 软件包。
gcloud
如需获取环境变量列表,请运行以下命令:
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.envVariables)"
如需获取环境的 Airflow 配置替换列表,请运行以下命令:
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.airflowConfigOverrides)"
如需获取自定义 PyPI 软件包的列表,请运行以下命令:
gcloud composer environments describe \
COMPOSER_1_ENV \
--location COMPOSER_1_LOCATION \
--format="value(config.softwareConfig.pypiPackages)"
替换:
- 将
COMPOSER_1_ENV
替换为 Cloud Composer 1 环境的名称。 - 将
COMPOSER_1_LOCATION
替换为 Cloud Composer 1 环境所在的区域。
Terraform
跳过此步骤。Cloud Composer 1 环境的配置已列出了您的环境的配置替换、自定义 PyPI 软件包和环境变量。
第 4 步:创建 Cloud Composer 2 环境
在此步骤中,创建一个 Cloud Composer 2 环境。您可以从符合预期资源需求的环境预设开始,然后再进一步扩缩和优化环境。
控制台
创建 Cloud Composer 2 环境并指定配置替换和环境变量。
或者,您可以在创建环境后替换 Airflow 配置和环境变量。
Airflow 1 中的一些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改。
gcloud
创建 Cloud Composer 2 环境并指定配置替换和环境变量。
或者,您可以在创建环境后替换 Airflow 配置和环境变量。
Airflow 1 中的一些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改。
Terraform
根据 Cloud Composer 1 环境的配置创建 Cloud Composer 2 环境:
- 复制 Cloud Composer 1 环境的配置。
- 更改环境的名称。
使用
google-beta
提供方:resource "google_composer_environment" "example_environment_composer_2" { provider = google-beta # ... }
在
config.software_config
块中指定 Cloud Composer 2 映像:software_config { image_version = "composer-2.9.7-airflow-2.9.3" # ... }
如果尚未指定配置替换和环境变量,请指定。
在
config.software_config.pypi_packages
块中指定自定义 PyPI 软件包:software_config { # ... pypi_packages = { numpy = "" scipy = ">=1.1.0" } }
第 5 步:将 PyPI 软件包安装到 Cloud Composer 2 环境中
创建 Cloud Composer 2 环境后,请向其中安装自定义 PyPI 软件包。
控制台
前往 Google Cloud 控制台中的环境页面:
选择 Cloud Composer 2 环境。
转到 PyPI 软件包标签页,然后点击修改。
从 Cloud Composer 1 环境复制 PyPI 软件包要求。点击保存,然后等待您的环境更新。
gcloud
使用自定义 PyPI 软件包列表创建一个
requirements.txt
文件:numpy scipy>=1.1.0
更新环境并将
requirements.txt
文件传递给--update-pypi-packages-from-file
命令:gcloud composer environments update COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --update-pypi-packages-from-file requirements.txt
替换:
- 将
COMPOSER_2_ENV
替换为 Cloud Composer 2 环境的名称。 - 将
COMPOSER_2_LOCATION
替换为 Cloud Composer 2 环境所在的区域。
- 将
Terraform
跳过此步骤。创建环境时,您已经安装了自定义 PyPI 软件包。
第 6 步:转移变量和池
Airflow 支持将变量和池导出到 JSON 文件。然后,您可以将这些文件导入 Cloud Composer 2 环境。
此步骤中使用的 Airflow CLI 命令适用于 Airflow 工作器中的本地文件。如需上传或下载文件,请使用环境的 Cloud Storage 存储桶中的 /data
文件夹。此文件夹会同步到 Airflow 工作器中的 /home/airflow/gcs/data/
目录。在 Airflow CLI 命令的参数 FILEPATH
中指定 /home/airflow/gcs/data/
。
gcloud
从 Cloud Composer 1 环境中导出变量:
gcloud composer environments run \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
替换:
- 将
COMPOSER_1_ENV
替换为 Cloud Composer 1 环境的名称。 - 将
COMPOSER_1_LOCATION
替换为 Cloud Composer 1 环境所在的区域。
- 将
从 Cloud Composer 1 环境中导出池:
gcloud composer environments run COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
替换:
- 将
COMPOSER_1_ENV
替换为 Cloud Composer 1 环境的名称。 - 将
COMPOSER_1_LOCATION
替换为 Cloud Composer 1 环境所在的区域。
- 将
获取 Cloud Composer 2 环境的存储桶 URI。
运行以下命令:
gcloud composer environments describe COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ --format="value(config.dagGcsPrefix)"
替换:
- 将
COMPOSER_2_ENV
替换为 Cloud Composer 2 环境的名称。 - 将
COMPOSER_2_LOCATION
替换为环境所在的区域。
- 将
在输出中,移除
/dags
文件夹。结果是 Cloud Composer 2 环境存储桶的 URI。例如,将
gs://us-central1-example-916807e1-bucket/dags
更改为gs://us-central1-example-916807e1-bucket
。
将包含变量和池的 JSON 文件转移到 Cloud Composer 2 环境:
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=variables.json
gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION \ --source=pools.json
替换:
- 将
COMPOSER_2_BUCKET
替换为在上一步获取的 Cloud Composer 2 环境存储桶的 URI。 - 将
COMPOSER_1_ENV
替换为 Cloud Composer 1 环境的名称。 - 将
COMPOSER_1_LOCATION
替换为 Cloud Composer 1 环境所在的区域。
- 将
将变量和池导入 Cloud Composer 2:
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
检查已导入的变量和池:
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ variables list
gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ pools list
从存储分区中移除 JSON 文件:
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
gcloud composer environments storage data delete \ variables.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
gcloud composer environments storage data delete \ pools.json \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
第 7 步:从 Cloud Composer 1 环境的存储桶转移其他数据
从 Cloud Composer 1 环境的存储桶转移插件和其他数据。
gcloud
将插件转移到 Cloud Composer 2 环境。为此,请将插件从 Cloud Composer 1 环境的存储桶导出到 Cloud Composer 2 环境存储桶的
/plugins
文件夹中:gcloud composer environments storage plugins export \ --destination=COMPOSER_2_BUCKET/plugins \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
检查
/plugins
文件夹是否已成功导入:gcloud composer environments storage plugins list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
将
/data
文件夹从 Cloud Composer 1 环境导出到 Airflow 2 环境:gcloud composer environments storage data export \ --destination=COMPOSER_2_BUCKET/data \ --environment=COMPOSER_1_ENV \ --location=COMPOSER_1_LOCATION
检查
/data
文件夹是否已成功导入:gcloud composer environments storage data list \ --environment=COMPOSER_2_ENV \ --location=COMPOSER_2_LOCATION
第 8 步:转移连接
Airflow 1.10.15 不支持导出连接。如需转移连接,请在 Cloud Composer 2 环境中通过 Cloud Composer 1 环境手动创建连接。
gcloud
如需获取 Cloud Composer 1 环境中的连接列表,请运行以下命令:
gcloud composer environments run COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ connections -- --list
如需在 Cloud Composer 2 环境中创建新连接,请通过
gcloud
运行connections
Airflow CLI 命令。例如:gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
第 9 步:转移用户账号
此步骤说明如何通过手动创建用户来转移用户。
Airflow 1.10.15 不支持导出用户。如需转移用户和连接,请在 Airflow 2 环境中通过 Cloud Composer 1 环境手动创建新的用户账号。
Airflow 界面
如需查看 Cloud Composer 1 环境中的用户列表,请执行以下操作:
转到管理 > 用户。
如需在 Cloud Composer 2 环境中创建用户,请执行以下操作:
转到安全性 > 列出用户。
点击添加新记录。
gcloud
-
无法在 Airflow 1 中通过
gcloud
查看用户列表。请使用 Airflow 界面。 如需在 Cloud Composer 2 环境中创建新的用户账号,请运行
users create
Airflow CLI 命令 至gcloud
。例如:gcloud composer environments run \ COMPOSER_2_ENV \ --location COMPOSER_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Op
替换:
- 将
COMPOSER_2_ENV
替换为 Cloud Composer 2 环境的名称。 - 将
COMPOSER_2_LOCATION
替换为 Cloud Composer 2 环境所在的区域。 - Cloud Composer 1 环境中的所有用户配置参数及其值,包括用户的角色。
- 将
第 10 步:确保您的 DAG 已准备好使用 Airflow 2
将 DAG 转移到 Cloud Composer 1 环境之前,请确保:
您的 DAG 已成功运行,没有剩余的兼容性问题。
您的 DAG 会使用正确的导入语句。
例如,
BigQueryCreateDataTransferOperator
的新导入语句可能如下所示:from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。
第 11 步:将 DAG 转移到 Cloud Composer 2 环境
在环境之间转移 DAG 时,可能会出现以下问题:
如果两个环境中都启用了(未暂停)某个 DAG,则每个环境都会安排自己的 DAG 副本。这可能会导致针对相同的数据和执行时间重复运行 DAG。
由于 DAG 同步,Airflow 从 DAG 中指定的开始日期开始安排额外的 DAG 运行。之所以发生这种情况,是因为新的 Airflow 实例不考虑从 Cloud Composer 1 环境运行 DAG 的历史记录。这可能会导致从指定的开始日期开始安排大量 DAG 运行。
防止重复的 DAG 运行
在 Cloud Composer 2 环境中或者在 Airflow 2 环境中为 dags_are_paused_at_creation
选项添加 Airflow 配置选项替换。进行此项更改后,默认情况下,所有新 DAG 都会暂停。
部分 | 键 | 值 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
防止额外运行或缺失的 DAG 运行
为避免执行日期出现间隔和重叠,请在 Cloud Composer 2 中停用同步。通过这种方式,您在将 DAG 上传到 Cloud Composer 2 环境后,Airflow 不会安排已在 Cloud Composer 1 环境中进行的 DAG 运行作业。为 catchup_by_default
选项添加 Airflow 配置选项替换:
部分 | 键 | 值 |
---|---|---|
scheduler |
catchup_by_default |
False |
将 DAG 转移到 Cloud Composer 2 环境
如需将 DAG 转移到 Cloud Composer 2 环境,请执行以下操作:
将 DAG 从 Cloud Composer 1 环境上传到 Cloud Composer 2 环境。跳过
airflow_monitoring.py
DAG。由于配置替换,DAG 在 Cloud Composer 2 环境中被暂停,因此未安排任何 DAG 运行。
在 Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。
当您计划转移 DAG 时,请执行以下操作:
在 Cloud Composer 1 环境中暂停 DAG。
在 Cloud Composer 2 环境中取消暂停 DAG。
检查新的 DAG 运行是否安排在正确的时间。
等待 DAG 在 Cloud Composer 2 环境中运行并检查运行是否成功。如果 DAG 运行成功,请不要在 Cloud Composer 1 环境中取消暂停该 DAG。如果这样做,则 DAG 会在 Cloud Composer 1 环境中以相同的时间和日期运行。
如果特定 DAG 运行失败,请尝试对该 DAG 进行问题排查,直到它在 Cloud Composer 2 中成功运行为止。
如果需要,您可以随时回退到 DAG 的 Cloud Composer 1 版本,并从 Cloud Composer 1 环境执行在 Cloud Composer 2 中失败的 DAG 运行:
在 Cloud Composer 2 环境中暂停 DAG。
在 Cloud Composer 1 环境中取消暂停 DAG。这安排了 DAG 在 Cloud Composer 1 环境中暂停时的 DAG 运行情况。
第 12 步:监控 Cloud Composer 2 环境
将所有 DAG 和配置转移到 Cloud Composer 2 环境后,请监控其是否存在潜在问题、DAG 运行失败以及环境整体运行状况。如果 Cloud Composer 2 环境在足够长的时间内正常运行,请考虑删除 Cloud Composer 1 环境。