将环境迁移到 Airflow 2

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何将 DAG,数据和配置从现有 Airflow 1.10.* 环境转移到具有 Airflow 2 及更高版本的 Airflow 环境。

其他迁移指南

From To 方法 指南
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排、手动转账 手动迁移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排、手动转账 手动迁移指南
Airflow 1 Airflow 2 并排、手动转账 本指南(手动迁移)

并列升级

通过 Cloud Composer 提供的 Cloud Composer 数据库转移脚本,您可以将元数据数据库、DAG、数据和插件从使用 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 环境迁移至使用 Airflow 2.0.1 及更高版本的现有 Cloud Composer 环境。

这是本指南中介绍的路径的替代路径。在使用提供的脚本时,本指南的某些部分仍然适用。例如,您可能希望在迁移前检查 DAG 是否兼容 Airflow 2,或者确保并发 DAG 运行不会发生,且没有额外或缺失的 DAG 运行。

准备工作

在开始将 Cloud Composer 环境与 Airflow 2 结合使用之前,请考虑 Airflow 2 为 Cloud Composer 环境所做的更改。

调度器高可用性

您可以在您的环境中使用多个 Airflow 调度器。您可以在创建环境时设置调度器的数量,或者通过更新现有环境进行更改。

Celery+Kubernetes Executor

当前版本的 Cloud Composer 不支持 Airflow 2 Celery+Kubernetes Executor

重大更改

Airflow 2 引入了许多重大更改,其中一些更改重大:

Airflow 2 和 Airflow 1.10 的环境之间存在差异.*

使用 Airflow 1.10.* 的 Cloud Composer 环境与使用 Airflow 2 的环境之间的主要区别:

  • 采用 Airflow 2 的环境使用 Python 3.8。此版本比 Airflow 1.10.* 环境中使用的版本更新。不支持 Python 2、Python 3.6 和 Python 3.7。
  • Airflow 2 使用不同的 CLI 格式。在使用 Airflow 2 的环境中,Cloud Composer 支持通过 gcloud composer environments run 命令使用新格式。
  • 在 Airflow 2 环境中,预安装的 PyPI 软件包有所不同。如需预安装 PyPI 软件包的列表,请参阅 Cloud Composer 版本列表
  • Airflow 2 中始终启用 DAG 序列化。因此,不再需要异步 DAG 加载,并且 Airflow 2 也不支持加载异步 DAG。因此,Airflow 2 不支持配置 [core]store_serialized_dags[core]store_dag_code 参数,尝试设置这些参数的操作会报告为错误。
  • 不支持 Airflow Web 服务器插件。这不会影响调度器或工作器插件,包括 Airflow 运算符和传感器。
  • 在 Airflow 2 环境中,默认 Airflow 用户角色Op。对于具有 Airflow 1.10.* 的环境,默认角色为 Admin

第 1 步:检查与 Airflow 2 的兼容性

如需检查与 Airflow 2 的潜在冲突,请使用现有 Airflow 1.10.* 环境中由 Airflow 提供的升级检查脚本。

gcloud

  1. 如果您的环境使用 Airflow 1.10.14 及更早版本,请将您的环境升级到使用 Airflow 1.10.15 及更高版本的 Cloud Composer 版本。从 Airflow 1.10.15 开始,Cloud Composer 支持升级检查命令。

  2. 通过 gcloud composer environments run 命令运行升级检查。一些与独立 Airflow 1.10.15 相关的升级检查与 Cloud Composer 无关。以下命令排除了这些检查。

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    替换:

    • AIRFLOW_1_ENV 替换为您的 Airflow 1.10.* 环境名称。
    • AIRFLOW_1_LOCATION 替换为环境所在的区域。
  3. 检查该命令的输出。更新检查脚本会报告现有环境中的潜在兼容性问题。

  4. 实现对 DAG 的其他更改,如《升级到 Airflow 2.0 及更高版本》指南的升级 DAG 部分中所述。

第 2 步:创建 Airflow 2 环境,转移配置替换和环境变量

创建 Airflow 2 环境并转移配置替换和环境变量:

  1. 按照创建环境中的步骤操作。在创建环境之前,还要指定配置替换和环境变量,如下文所述。

  2. 选择映像时,请选择具有 Airflow 2 的映像

  3. 手动将配置参数从 Airflow 1.10.* 环境转移到新的 Airflow 2 环境。

    控制台

    1. 创建环境时,展开网络、Airflow 配置替换及其他功能部分。

    2. Airflow 配置替换下方,点击添加 Airflow 配置替换

    3. 从 Airflow 1.10.* 环境中复制所有配置替换。

      某些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改

    4. 环境变量下,点击添加环境变量

    5. 从 Airflow 1.10.* 环境中复制所有环境变量。

    6. 点击创建以创建环境。

第 3 步:将 PyPI 软件包安装到 Airflow 2 环境

创建 Airflow 2 环境后,在其中安装 PyPI 软件包:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择您的 Airflow 2 环境。

  3. 转到 PyPI 软件包标签页,然后点击修改

  4. 从 Airflow 1.10.* 环境中复制 PyPI 软件包要求。点击保存,然后等待您的环境更新。

    由于 Airflow 2 环境使用了一组不同的预装软件包和不同的 Python 版本,因此可能会遇到 PyPI 软件包冲突,难以解决。如需诊断软件包依赖项问题,一种方法是在 Airflow 工作器 pod 中安装软件包来检查 PyPI 软件包错误。

第 4 步:将变量和池转移到 Airflow 2

Airflow 1.10.* 支持将变量和池导出到 JSON 文件。然后,您可以将这些文件导入您的 Airflow 2 环境。

您只有在拥有 default_pool 以外的自定义池时才需要转移池。否则,请跳过导出和导入池的命令。

gcloud

  1. 从 Airflow 1.10.* 环境中导出变量:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    您需要将其中的:

    • AIRFLOW_1_ENV 替换为您的 Airflow 1.10.* 环境名称。
    • AIRFLOW_1_LOCATION 替换为环境所在的区域。
  2. 从 Airflow 1.10.* 环境中导出池:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. 获取 Airflow 2 环境存储桶 URI。

    1. 运行以下命令:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      您需要将其中的:

      • AIRFLOW_2_ENV 替换为您的 Airflow 2 环境名称。
      • AIRFLOW_2_LOCATION 替换为环境所在的区域。
    2. 在输出中,移除 /dags 文件夹。结果是 Airflow 2 环境存储桶的 URI。

      例如,将 gs://us-central1-example-916807e1-bucket/dags 更改为 gs://us-central1-example-916807e1-bucket

  4. 将包含变量和池的 JSON 文件转移到您的 Airflow 2 环境:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET 替换为在上一步中获得的 Airflow 2 环境存储桶的 URI。

  5. 将变量和池导入 Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 检查已导入的变量和池:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 从存储分区中移除 JSON 文件:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

第 5 步:从 Airflow 1.10.* 环境存储桶转移其他数据

gcloud

  1. 插件转移到您的 Airflow 2 环境。为此,请将插件从 Airflow 1.10.* 环境存储桶导出到 Airflow 2 环境存储桶的 /plugins 文件夹:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. 检查 /plugins 文件夹是否已成功导入:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. /data 文件夹从 Airflow 1.10.* 环境导出到 Airflow 2 环境:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. 检查 /data 文件夹是否已成功导入:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

第 6 步:转移连接和用户

Airflow 1.10.* 不支持导出用户和连接。如需转移用户和连接,请在 Airflow 2 环境中手动创建新的用户账号和连接。

gcloud

  1. 如需获取 Airflow 1.10.* 环境中的连接列表,请运行以下命令:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. 如需在 Airflow 2 环境中创建新连接,请通过 gcloud 运行 connections Airflow CLI 命令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. 如需查看 Airflow 1.10.* 环境中的用户列表,请执行以下操作:

    1. 为 Airflow 1.10.* 环境打开 Airflow 网页界面

    2. 转到管理 > 用户

  4. 如需在 Airflow 2 环境中创建新的用户账号,请通过 gcloud 运行 users create Airflow CLI 命令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

第 7 步:确保您的 DAG 已准备好使用 Airflow 2

在将 DAG 转移到 Airflow 2 环境之前,请确保:

  1. 您的 DAG 升级检查脚本已成功运行,没有剩余的兼容性问题。

  2. 您的 DAG 会使用正确的导入语句

    例如,BigQueryCreateDataTransferOperator 的新导入语句可能如下所示:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。

第 8 步:将 DAG 转移到 Airflow 2 环境

在环境之间转移 DAG 时,可能会出现以下问题:

  • 如果两个环境中都启用了(未暂停)某个 DAG,则每个环境都会安排自己的 DAG 副本。这可能会导致在相同的数据和执行时间内并发运行 DAG。

  • 由于 DAG 同步,Airflow 从 DAG 中指定的开始日期开始安排额外的 DAG 运行。这是因为新的 Airflow 实例不考虑从 1.10.* 环境运行的 DAG 的历史记录。这可能会导致从指定的开始日期开始安排大量 DAG 运行。

防止并发 DAG 运行

在您的 Airflow 2 环境中,替换 dags_are_paused_at_creation Airflow 配置选项。进行此项更改后,默认情况下,所有新 DAG 都会暂停。

部分
core dags_are_paused_at_creation True

防止额外运行或缺失的 DAG 运行

在要转移到 Airflow 2 环境的 DAG 中,指定新的静态开始日期

为避免执行日期出现间断和重叠,第一次 DAG 运行应在下一个安排时间间隔的 Airflow 2 环境中进行。为此,请将 DAG 中新的开始日期设置为早于 Airflow 1.10.* 环境中最后一次运行的日期。

例如,如果您在 Airflow 1.10.* 环境中的每天 15:00、17:00 和 21:00 运行 DAG,则上次 DAG 运行发生在 15:00,并且您计划于 15:15 转移此 DAG,那么 Airflow 2 环境的开始日期就是今天的 14:45。在 Airflow 2 环境中启用 DAG 后,Airflow 会安排在 17:00 运行 DAG。

另一个示例是,如果您的 DAG 在 Airflow 1.10.* 环境中的每一天 00:00 运行,则上次 DAG 运行发生在 2021 年 4 月 26 日 00:00,并且您计划在 2021 年 4 月 26 日 13:00 转移该 DAG,Airflow 2 环境的开始日期是 2021 年 4 月 25 日 23:45。在 Airflow 2 环境中启用 DAG 后,Airflow 将安排在 2021 年 4 月 27 日 00:00 运行 DAG。

将 DAG 逐个转移到 Airflow 2 环境

对于每个 DAG,请按照以下步骤转移它:

  1. 确保按照上一部分中的说明设置 DAG 中的新开始日期。

  2. 将更新的 DAG 上传到 Airflow 2 环境。由于配置替换,此 DAG 在 Airflow 2 环境中暂停,因此尚未安排任何 DAG 运行。

  3. Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。

  4. 当您计划转移 DAG 时,请执行以下操作:

    1. 暂停 Airflow 1.10.* 环境中的 DAG。

    2. 取消暂停 Airflow 2 环境中的 DAG。

    3. 检查新的 DAG 运行是否安排在正确的时间。

    4. 等待 DAG 运行在 Airflow 2 环境中发生,并检查运行是否成功。

  5. 根据 DAG 是否成功运行:

    • 如果 DAG 运行成功,您可以继续在 Airflow 2 环境中使用 DAG。最后,考虑删除 Airflow 1.10.* 版本的 DAG。

    • 如果 DAG 运行失败,请尝试排查 DAG 问题,直到它在 Airflow 2 中成功运行为止。

      如果需要,您始终可以回退到 Airflow 1.10.* 版本的 DAG:

      1. 暂停 Airflow 2 环境中的 DAG。

      2. 在 Airflow 1.10.* 环境中恢复暂停的 DAG。这会为新的 DAG 运行安排与失败的 DAG 运行相同的日期和时间。

      3. 当您准备好继续使用 Airflow 2 版本的 DAG 时,请调整开始日期,将新版本的 DAG 上传到 Airflow 2 环境,然后重复上述过程。

第 9 步:监控 Airflow 2 环境

将所有 DAG 和配置转移到 Airflow 2 环境后,请监控其是否存在潜在问题、DAG 运行失败以及环境整体运行状况。如果 Airflow 2 环境在足够长的时间内正常运行,则可以移除 Airflow 1.10.* 环境。

后续步骤