将环境迁移到 Airflow 2

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍如何将 DAG,数据和配置从现有 Airflow 1.10.* 环境转移到具有 Airflow 2 及更高版本的 Airflow 环境。

其他迁移指南

收件人 方法 指南
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排显示,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排手动传输 手动迁移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排手动转移 手动迁移指南
Airflow 1 Airflow 2 并排手动转移 本指南(手动迁移)

并列升级

通过 Cloud Composer 提供的 Cloud Composer 数据库转移脚本,您可以将元数据数据库、DAG、数据和插件从使用 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 环境迁移至使用 Airflow 2.0.1 及更高版本的现有 Cloud Composer 环境。

这是本指南中介绍的路径的替代路径。在使用提供的脚本时,本指南的某些部分仍然适用。例如,您可能希望在迁移前检查 DAG 是否兼容 Airflow 2,或者确保并发 DAG 运行不会发生,且没有额外或缺失的 DAG 运行。

准备工作

在开始将 Cloud Composer 环境与 Airflow 2 结合使用之前,请考虑 Airflow 2 为 Cloud Composer 环境所做的更改。

调度器高可用性

您可以在您的环境中使用多个 Airflow 调度器。您可以在创建环境时设置调度器的数量,或者通过更新现有环境进行更改。

Celery+Kubernetes Executor

Airflow 2 Celery+Kubernetes 执行器 支持 Cloud Composer 3。

重大更改

Airflow 2 引入了许多重大更改,其中一些更改重大:

Airflow 2 和 Airflow 1.10 的环境之间存在差异.*

使用 Airflow 1.10.* 的 Cloud Composer 环境与使用 Airflow 2 的环境之间的主要区别:

  • 采用 Airflow 2 的环境使用 Python 3.8。此版本比 Airflow 1.10.* 环境中使用的版本更新。不支持 Python 2、Python 3.6 和 Python 3.7。
  • Airflow 2 使用不同的 CLI 格式。在使用 Airflow 2 的环境中,Cloud Composer 支持通过 gcloud composer environments run 命令使用新格式。
  • 在 Airflow 2 环境中,预安装的 PyPI 软件包有所不同。如需预安装 PyPI 软件包的列表,请参阅 Cloud Composer 版本列表
  • Airflow 2 中始终启用 DAG 序列化。因此,不再需要异步 DAG 加载,并且 Airflow 2 也不支持加载异步 DAG。因此,Airflow 2 不支持配置 [core]store_serialized_dags[core]store_dag_code 参数,尝试设置这些参数的操作会报告为错误。
  • 不支持 Airflow Web 服务器插件。这不会影响调度器或工作器插件,包括 Airflow 运算符和传感器。
  • 在 Airflow 2 环境中,默认 Airflow 用户角色Op。对于具有 Airflow 1.10.* 的环境,默认角色为 Admin

第 1 步:检查与 Airflow 2 的兼容性

如需检查与 Airflow 2 是否存在潜在冲突,请参阅“升级到 Airflow 2.0+ 指南中的 升级 DAG

您可能会遇到的一个常见问题与导入路径不兼容。如需详细了解如何解决此兼容性问题,请参阅 升级到 Airflow 2.0+ 指南,请参阅 关于向后移植提供程序的部分

第 2 步:创建 Airflow 2 环境,转移配置替换和环境变量

创建 Airflow 2 环境并转移配置替换和环境变量:

  1. 按照创建环境中的步骤操作。在创建环境之前,还要指定配置替换和环境变量,如下文所述。

  2. 选择映像时,请选择具有 Airflow 2 的映像

  3. 手动将配置参数从 Airflow 1.10.* 环境转移到新的 Airflow 2 环境。

    控制台

    1. 创建环境时,展开网络、Airflow 配置替换及其他功能部分。

    2. Airflow 配置替换下方,点击添加 Airflow 配置替换

    3. 从 Airflow 1.10.* 环境中复制所有配置替换。

      某些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改

    4. 环境变量下,点击添加环境变量

    5. 从 Airflow 1.10.* 环境中复制所有环境变量。

    6. 点击创建以创建环境。

第 3 步:将 PyPI 软件包安装到 Airflow 2 环境

创建 Airflow 2 环境后,在其中安装 PyPI 软件包:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择您的 Airflow 2 环境。

  3. 转到 PyPI 软件包标签页,然后点击修改

  4. 从 Airflow 1.10.* 环境中复制 PyPI 软件包要求。点击保存,然后等待您的环境更新。

    由于 Airflow 2 环境使用了一组不同的预装软件包和不同的 Python 版本,因此可能会遇到 PyPI 软件包冲突,难以解决。如需诊断软件包依赖项问题,一种方法是在 Airflow 工作器 pod 中安装软件包来检查 PyPI 软件包错误。

第 4 步:将变量和池转移到 Airflow 2

Airflow 1.10.* 支持将变量和池导出到 JSON 文件。然后,您可以将这些文件导入您的 Airflow 2 环境。

您只有在拥有 default_pool 以外的自定义池时才需要转移池。否则,请跳过导出和导入池的命令。

gcloud

  1. 从 Airflow 1.10.* 环境中导出变量:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    替换:

    • AIRFLOW_1_ENV 替换为您的 Airflow 1.10.* 环境名称。
    • AIRFLOW_1_LOCATION 替换为环境所在的区域。
  2. 从 Airflow 1.10.* 环境中导出池:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. 获取 Airflow 2 环境存储分区 URI。

    1. 运行以下命令:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      替换:

      • AIRFLOW_2_ENV 替换为您的 Airflow 2 环境名称。
      • AIRFLOW_2_LOCATION 替换为环境所在的区域。
    2. 在输出中,移除 /dags 文件夹。结果是 Airflow 2 环境存储桶的 URI。

      例如,将 gs://us-central1-example-916807e1-bucket/dags 更改为 gs://us-central1-example-916807e1-bucket

  4. 将包含变量和池的 JSON 文件转移到您的 Airflow 2 环境:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET 替换为在上一步中获得的 Airflow 2 环境存储分区的 URI。

  5. 将变量和池导入 Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 检查已导入的变量和池:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 从存储分区中移除 JSON 文件:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

第 5 步:从 Airflow 1.10.* 环境存储分区转移其他数据

gcloud

  1. 插件转移到您的 Airflow 2 环境。为此,请将插件从 Airflow 1.10.* 环境存储分区导出到 Airflow 2 环境存储分区的 /plugins 文件夹:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. 检查 /plugins 文件夹是否已成功导入:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. /data 文件夹从 Airflow 1.10.* 环境导出到 Airflow 2 环境:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. 检查 /data 文件夹是否已成功导入:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

第 6 步:转移连接和用户

Airflow 1.10.* 不支持导出用户和连接。如需转移用户和连接,请在 Airflow 2 环境中手动创建新的用户账号和连接。

gcloud

  1. 如需获取 Airflow 1.10.* 环境中的连接列表,请运行以下命令:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. 如需在 Airflow 2 环境中创建新连接,请通过 gcloud 运行 connections Airflow CLI 命令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. 如需查看 Airflow 1.10.* 环境中的用户列表,请执行以下操作:

    1. 为 Airflow 1.10.* 环境打开 Airflow 网页界面

    2. 转到管理 > 用户

  4. 要在 Airflow 2 环境中创建新的用户账号,请运行 users create Airflow CLI 命令 通过 gcloud例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

第 7 步:确保您的 DAG 已准备好使用 Airflow 2

在将 DAG 转移到 Airflow 2 环境之前,请确保:

  1. 您的 DAG 已成功运行,没有剩余的兼容性问题。

  2. 您的 DAG 会使用正确的导入语句

    例如,BigQueryCreateDataTransferOperator 的新导入语句可能如下所示:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。

第 8 步:将 DAG 转移到 Airflow 2 环境

在环境之间转移 DAG 时,可能会出现以下问题:

  • 如果两个环境中都启用了(未暂停)某个 DAG,则每个环境都会安排自己的 DAG 副本。这可能会导致在相同的数据和执行时间内并发运行 DAG。

  • 由于 DAG 同步,Airflow 从 DAG 中指定的开始日期开始安排额外的 DAG 运行。这是因为新的 Airflow 实例不考虑从 1.10.* 环境运行的 DAG 的历史记录。这可能会导致从指定的开始日期开始安排大量 DAG 运行。

防止并发 DAG 运行

在您的 Airflow 2 环境中,替换 dags_are_paused_at_creation Airflow 配置选项。进行此项更改后,默认情况下,所有新 DAG 都会暂停。

部分
core dags_are_paused_at_creation True

防止额外运行或缺失的 DAG 运行

在要转移到 Airflow 2 环境的 DAG 中,指定新的静态开始日期

为避免执行日期出现间断和重叠,第一次 DAG 运行应在下一个安排时间间隔的 Airflow 2 环境中进行。为此,请将 DAG 中新的开始日期设置为早于 Airflow 1.10.* 环境中最后一次运行的日期。

例如,如果您在 Airflow 1.10.* 环境中的每天 15:00、17:00 和 21:00 运行 DAG,则上次 DAG 运行发生在 15:00,并且您计划于 15:15 转移此 DAG,那么 Airflow 2 环境的开始日期就是今天的 14:45。在 Airflow 2 环境中启用 DAG 后,Airflow 会安排在 17:00 运行 DAG。

另一个示例是,如果您的 DAG 在 Airflow 1.10.* 环境中的每一天 00:00 运行,则上次 DAG 运行发生在 2021 年 4 月 26 日 00:00,并且您计划在 2021 年 4 月 26 日 13:00 转移该 DAG,Airflow 2 环境的开始日期是 2021 年 4 月 25 日 23:45。在 Airflow 2 环境中启用 DAG 后,Airflow 将安排在 2021 年 4 月 27 日 00:00 运行 DAG。

将 DAG 逐个转移到 Airflow 2 环境

对于每个 DAG,请按照以下步骤转移它:

  1. 确保按照上一部分中的说明设置 DAG 中的新开始日期。

  2. 将更新的 DAG 上传到 Airflow 2 环境。由于配置替换,此 DAG 在 Airflow 2 环境中暂停,因此尚未安排任何 DAG 运行。

  3. Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。

  4. 当您计划转移 DAG 时,请执行以下操作:

    1. 暂停 Airflow 1.10.* 环境中的 DAG。

    2. 取消暂停 Airflow 2 环境中的 DAG。

    3. 检查新的 DAG 运行是否安排在正确的时间。

    4. 等待 DAG 运行在 Airflow 2 环境中发生,并检查运行是否成功。

  5. 根据 DAG 是否成功运行:

    • 如果 DAG 运行成功,您可以继续在 Airflow 2 环境中使用 DAG。最后,考虑删除 Airflow 1.10.* 版本的 DAG。

    • 如果 DAG 运行失败,请尝试排查 DAG 问题,直到它在 Airflow 2 中成功运行为止。

      如果需要,您始终可以回退到 Airflow 1.10.* 版本的 DAG:

      1. 暂停 Airflow 2 环境中的 DAG。

      2. 在 Airflow 1.10.* 环境中恢复暂停的 DAG。这会为新的 DAG 运行安排与失败的 DAG 运行相同的日期和时间。

      3. 当您准备好继续使用 Airflow 2 版本的 DAG 时,请调整开始日期,将新版本的 DAG 上传到 Airflow 2 环境,然后重复上述过程。

第 9 步:监控 Airflow 2 环境

将所有 DAG 和配置转移到 Airflow 2 环境后,请监控其是否存在潜在问题、DAG 运行失败以及环境整体运行状况。如果 Airflow 2 环境在足够长的时间内正常运行,则可以移除 Airflow 1.10.* 环境。

后续步骤