使用快照将环境迁移到 Cloud Composer 2(来自 Airflow 1)

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍如何将 DAG、数据和配置从现有 Cloud Composer 1、Airflow 1 环境转移到 Cloud Composer 2、Airflow 2。

本迁移指南使用了快照功能。

其他迁移指南

收件人 方法 指南
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排显示,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排,使用快照 本指南(快照)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排手动转移 手动迁移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排手动转移 手动迁移指南
Airflow 1 Airflow 2 并排手动传输 手动迁移指南

准备工作

  • Cloud Composer 2 2.0.9 及更高版本支持快照。在 1.18.5 版中,Cloud Composer 1 支持保存环境快照。

  • Cloud Composer 支持从 Cloud Composer 1 并行迁移到 Cloud Composer 2。无法从 Cloud Composer 1 就地升级到 Cloud Composer 2。

  • 查看 Cloud Composer 1 和 Cloud Composer 2

  • 支持快照的 Airflow 数据库的大小上限为 20 GB。如果环境的数据库占用 超过 20 GB, 减小 Airflow 数据库的大小

  • 若要创建快照,环境存储桶中的 /dags/plugins/data 文件夹中的对象总数必须少于 10 万。

  • 如果您使用 XCom 机制传输文件,请确保按照 Airflow 准则使用该机制。使用 XCom 传输大文件或大量文件会产生以下影响 Airflow 数据库的性能,可能会导致加载失败 快照或升级您的环境请考虑使用如下替代方案 Cloud Storage 来传输大量数据。

  • 由于 Cloud Composer 2 使用 Airflow 2,因此迁移包括将 DAG 和环境配置切换为 Airflow 2。如需了解 Cloud Composer 的 Airflow 1 和 Airflow 2 之间的重大更改,请参阅从 Airflow 1 到 Airflow 2 的迁移指南

  • 在本指南中,您将迁移到 Airflow 2 和迁移到 Cloud Composer 2 合并到一个迁移过程中。通过这种方式,您无需在迁移到 Cloud Composer 2 之前使用 Airflow 2 迁移到 Cloud Composer 1 环境。

第 1 步:升级到 Airflow 1.10.15

如果您的环境使用低于 1.10.15 的 Airflow 版本,请将环境升级到使用 Airflow 1.10.15 且支持快照的 Cloud Composer 版本

第 2 步:检查与 Airflow 2 的兼容性

如需检查与 Airflow 2 的潜在冲突,请参阅《升级到 Airflow 2.0 及更高版本》指南的升级 DAG 部分。

您可能会遇到的一个常见问题与不兼容的导入有关 路径。如需详细了解如何解决此兼容性问题,请参阅 升级到 Airflow 2.0+ 指南,请参阅 关于向后移植提供程序的部分

第 3 步:确保您的 DAG 已准备好使用 Airflow 2

在将 DAG 转移到 Cloud Composer 2 环境之前,请确保:

  1. 您的 DAG 已成功运行,没有剩余的兼容性问题。

  2. 您的 DAG 会使用正确的导入语句

    例如,BigQueryCreateDataTransferOperator 的新导入语句可能如下所示:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。

第 4 步:在 Cloud Composer 1 环境中暂停 DAG

为避免重复运行 DAG,请先暂停 Cloud Composer 1 环境中的所有 DAG,然后再保存其快照。

您可以使用以下任一选项:

  • Airflow 网页界面中, 前往 DAG 并手动暂停所有 DAG。

  • 使用 composer_dags 脚本暂停所有 DAG:

    python3 composer_dags.py --environment COMPOSER_1_ENV \
      --project PROJECT_ID \
      --location COMPOSER_1_LOCATION \
      --operation pause
    

    您需要将其中的:

    • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
    • PROJECT_ID 替换为项目 ID
    • COMPOSER_1_LOCATION 替换为环境所在的区域。

第 5 步:保存 Cloud Composer 1 环境的快照

控制台

创建环境的快照:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的 Cloud Composer 1 环境的名称。环境详情页面会打开。

  3. 点击创建快照

  4. 创建快照对话框中,点击提交。在本指南中,您将快照保存在 Cloud Composer 1 环境的存储桶中,但您也可以根据需要选择其他位置。

  5. 等待 Cloud Composer 创建快照。

gcloud

  1. 获取 Cloud Composer 1 环境的存储桶 URI:

    1. 运行以下命令:

      gcloud composer environments describe COMPOSER_1_ENV \
          --location COMPOSER_1_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      您需要将其中的:

      • COMPOSER_1_ENV 替换为您的 Cloud Composer 1 的名称 环境
      • COMPOSER_1_LOCATION 替换为环境所在的区域。
    2. 在输出中,移除 /dags 文件夹。结果是 Cloud Composer 1 环境的存储桶中。

      例如,将 gs://us-central1-example-916807e1-bucket/dags 更改为 gs://us-central1-example-916807e1-bucket

  2. 创建 Cloud Composer 1 环境的快照:

    gcloud composer environments snapshots save \
      COMPOSER_1_ENV \
      --location COMPOSER_1_LOCATION \
      --snapshot-location "COMPOSER_1_SNAPSHOTS_FOLDER"
    

    您需要将其中的:

    • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
    • COMPOSER_1_LOCATION 替换为 Cloud Composer 1 环境所在的区域。
    • COMPOSER_1_SNAPSHOTS_FOLDER 替换为您的 Cloud Composer 1 的 URI 环境的存储桶中。在本指南中,您需要将快照保存在 Cloud Composer 1 环境的存储桶,但您可以选择 。如果您指定了自定义位置,则两个环境的服务账号必须对指定位置拥有读取和写入权限。

第 6 步:创建 Cloud Composer 2 环境

创建 Cloud Composer 2 环境。您可以从符合预期资源需求的环境预设开始,然后再进一步扩缩和优化环境。

您无需指定配置替换项和环境变量,因为您稍后在加载 Cloud Composer 1 环境的快照时会替换它们。

Airflow 1 中的一些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改

第 7 步:将快照加载到 Cloud Composer 2 环境

控制台

如需将快照加载到 Cloud Composer 2 环境中,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击 Cloud Composer 2 环境。环境详情页面会打开。

  3. 点击加载快照

  4. 加载快照对话框中,点击浏览

  5. 选择包含快照的文件夹。如果您使用本指南中的默认位置,则此文件夹位于 Cloud Composer 1 环境存储桶中的 /snapshots 文件夹中,其名称为快照保存操作的时间戳。例如 us-central1-example-916807e1-bucket/snapshots_example-project_us-central1_example-environment/2022-01-05T18-59-00

  6. 点击加载,然后等待 Cloud Composer 加载快照。

gcloud

将 Cloud Composer 1 环境的快照加载到 Cloud Composer 2 环境:

gcloud composer environments snapshots load \
  COMPOSER_2_ENV \
  --location COMPOSER_2_LOCATION \
  --snapshot-path "SNAPSHOT_PATH"

您需要将其中的:

  • COMPOSER_2_ENV 替换为 Cloud Composer 2 环境的名称。
  • COMPOSER_2_LOCATION 替换为 Cloud Composer 2 环境所在的区域。
  • SNAPSHOT_PATH 替换为您的 Cloud Composer 1 的 URI 环境的存储桶,后跟快照的路径。例如,gs://us-central1-example-916807e1-bucket/snapshots/example-project_us-central1_example-environment_2022-01-05T18-59-00

第 8 步:在 Cloud Composer 2 环境中取消暂停 DAG

您可以使用以下任一选项:

  • Airflow 网页界面中,前往 DAG,然后逐个手动取消暂停所有 DAG。

  • 使用 composer_dags 脚本取消暂停所有 DAG:

      python3 composer_dags.py --environment COMPOSER_2_ENV \
      --project PROJECT_ID \
      --location COMPOSER_2_LOCATION \
      --operation unpause
    

    您需要将其中的:

    • COMPOSER_2_ENV 替换为 Cloud Composer 2 环境的名称。
    • PROJECT_ID 替换为项目 ID
    • COMPOSER_2_LOCATION 替换为环境所在的区域。

第 9 步:检查是否存在 DAG 错误

  1. Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。

  2. 检查是否将 DAG 运行安排在正确的时间。

  3. 等待 DAG 在 Cloud Composer 2 环境中运行并检查运行是否成功。如果 DAG 运行成功,请不要在 Cloud Composer 1 环境中取消暂停该 DAG。如果这样做,则 DAG 会在 Cloud Composer 1 环境中以相同的时间和日期运行。

  4. 如果特定 DAG 运行失败,请尝试执行以下操作: 排查 DAG 问题,直到成功 在 Cloud Composer 2 中运行。

第 10 步:监控 Cloud Composer 2 环境

将所有 DAG 和配置转移到 Cloud Composer 后 监控其是否存在潜在问题、失败的 DAG 运行以及 环境健康状况

如果 Cloud Composer 2 环境在足够长的时间内正常运行,请考虑删除 Cloud Composer 1 环境。

后续步骤