将环境迁移到 Cloud Composer 2(从 Airflow 1 迁移)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍如何将 DAG、数据和配置从现有 Cloud Composer 1、Airflow 1 环境转移到 Cloud Composer 2、Airflow 2。

其他迁移指南

收件人 方法 指南
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排显示,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排显示,使用快照 迁移指南(快照)
Cloud Composer 1、Airflow 2 Cloud Composer 2、Airflow 2 并排手动传输 手动迁移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 并排手动传输 本指南(手动迁移)
Airflow 1 Airflow 2 并排手动传输 手动迁移指南

准备工作

  • Cloud Composer 支持从 Cloud Composer 1 并行迁移到 Cloud Composer 2。无法从 Cloud Composer 1 就地升级到 Cloud Composer 2。
  • 请查看 Cloud Composer 1 和 Cloud Composer 2
  • 由于 Cloud Composer 2 使用 Airflow 2,因此迁移包括将 DAG 和环境配置切换为 Airflow 2。如需了解 Cloud Composer 的 Airflow 1 和 Airflow 2 之间的重大更改,请参阅从 Airflow 1 到 Airflow 2 的迁移指南

  • 在本指南中,您将迁移到 Airflow 2 和迁移到 Cloud Composer 2 合并到一个迁移过程中。通过这种方式,您无需在迁移到 Cloud Composer 2 之前使用 Airflow 2 迁移到 Cloud Composer 1 环境。

第 1 步:升级到 Airflow 1.10.15

如果环境使用低于 1.10.15 的 Airflow 版本,请将环境升级到使用 Airflow 1.10.15 的 Cloud Composer 版本

第 2 步:检查与 Airflow 2 的兼容性

如需检查与 Airflow 2 是否存在潜在冲突,请参阅“升级到 Airflow 2.0+ 指南中的 升级 DAG

您可能会遇到的一个常见问题与不兼容的导入有关 路径。如需详细了解如何解决此兼容性问题,请参阅 升级到 Airflow 2.0+ 指南,请参阅 关于向后移植提供程序的部分

第 3 步:获取配置替换、自定义 PyPI 软件包和环境变量的列表

控制台

获取 Cloud Composer 1 环境的配置替换、自定义 PyPI 软件包和环境变量的列表:

  1. 前往 Google Cloud 控制台中的环境页面:

    转到“环境”

  2. 选择 Cloud Composer 1 环境。

  3. 环境变量标签页上查看环境变量。

  4. Airflow 配置替换标签页上查看配置替换。

  5. PyPI 软件包标签页中查看自定义 PyPI 软件包。

gcloud

如需获取环境变量列表,请运行以下命令:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.envVariables)"

如需获取环境的 Airflow 配置替换列表,请运行以下命令:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.airflowConfigOverrides)"

如需获取自定义 PyPI 软件包的列表,请运行以下命令:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.pypiPackages)"

替换:

  • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
  • COMPOSER_1_LOCATION 替换为 Cloud Composer 1 环境所在的区域。

Terraform

跳过此步骤。Cloud Composer 1 环境的配置已列出了您的环境的配置替换、自定义 PyPI 软件包和环境变量。

第 4 步:创建 Cloud Composer 2 环境

在此步骤中,创建一个 Cloud Composer 2 环境。您可以从符合预期资源需求的环境预设开始,然后再进一步扩缩和优化环境。

控制台

创建 Cloud Composer 2 环境指定配置替换和环境变量

或者,您可以在创建环境后替换 Airflow 配置和环境变量

Airflow 1 中的一些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改

gcloud

创建 Cloud Composer 2 环境指定配置替换和环境变量

或者,您可以在创建环境后替换 Airflow 配置和环境变量

Airflow 1 中的一些配置选项在 Airflow 2 中使用不同的名称和部分。如需了解详情,请参阅配置更改

Terraform

根据 Cloud Composer 1 环境的配置创建 Cloud Composer 2 环境:

  1. 复制 Cloud Composer 1 环境的配置。
  2. 更改环境的名称。
  3. 使用 google-beta 提供方:

    resource "google_composer_environment" "example_environment_composer_2" {
      provider = google-beta
      # ...
    }
    
  4. config.software_config 块中指定 Cloud Composer 2 映像:

    software_config {
      image_version = "composer-2.9.7-airflow-2.9.3"
      # ...
    }
    
  5. 如果尚未指定配置替换和环境变量,请指定。

  6. config.software_config.pypi_packages 块中指定自定义 PyPI 软件包:

    software_config {
    
      # ...
    
      pypi_packages = {
        numpy = ""
        scipy = ">=1.1.0"
      }
    
    }
    

第 5 步:将 PyPI 软件包安装到 Cloud Composer 2 环境中

创建 Cloud Composer 2 环境后,请向其中安装自定义 PyPI 软件包。

控制台

  1. 前往 Google Cloud 控制台中的环境页面:

    转到“环境”

  2. 选择 Cloud Composer 2 环境。

  3. 转到 PyPI 软件包标签页,然后点击修改

  4. 从 Cloud Composer 1 环境复制 PyPI 软件包要求。点击保存,然后等待您的环境更新。

gcloud

  1. 使用自定义 PyPI 软件包列表创建一个 requirements.txt 文件:

      numpy
      scipy>=1.1.0
    
  2. 更新环境并将 requirements.txt 文件传递给 --update-pypi-packages-from-file 命令:

    gcloud composer environments update COMPOSER_2_ENV \
      --location COMPOSER_2_LOCATION  \
      --update-pypi-packages-from-file requirements.txt
    

    替换:

    • COMPOSER_2_ENV 替换为 Cloud Composer 2 环境的名称。
    • COMPOSER_2_LOCATION 替换为 Cloud Composer 2 环境所在的区域。

Terraform

跳过此步骤。创建环境时,您已经安装了自定义 PyPI 软件包。

第 6 步:转移变量和池

Airflow 支持将变量和池导出到 JSON 文件。然后,您可以将这些文件导入 Cloud Composer 2 环境。

此步骤中使用的 Airflow CLI 命令适用于 Airflow 工作器中的本地文件。如需上传或下载文件,请使用环境的 Cloud Storage 存储桶中的 /data 文件夹。此文件夹会同步到 Airflow 工作器中的 /home/airflow/gcs/data/ 目录。在 Airflow CLI 命令的参数 FILEPATH 中指定 /home/airflow/gcs/data/

gcloud

  1. 从 Cloud Composer 1 环境中导出变量:

    gcloud composer environments run \
        COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    替换:

    • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
    • COMPOSER_1_LOCATION 替换为 Cloud Composer 1 环境所在的区域。
  2. 从 Cloud Composer 1 环境中导出池:

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    

    替换:

    • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
    • COMPOSER_1_LOCATION 替换为 Cloud Composer 1 环境所在的区域。
  3. 获取 Cloud Composer 2 环境的存储桶 URI。

    1. 运行以下命令:

      gcloud composer environments describe COMPOSER_2_ENV \
          --location COMPOSER_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      替换:

      • COMPOSER_2_ENV 替换为 Cloud Composer 2 环境的名称。
      • COMPOSER_2_LOCATION 替换为环境所在的区域。
    2. 在输出中,移除 /dags 文件夹。结果是 Cloud Composer 2 环境存储桶的 URI。

      例如,将 gs://us-central1-example-916807e1-bucket/dags 更改为 gs://us-central1-example-916807e1-bucket

  4. 将包含变量和池的 JSON 文件转移到 Cloud Composer 2 环境:

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=pools.json
    

    替换:

    • COMPOSER_2_BUCKET 替换为在上一步获取的 Cloud Composer 2 环境存储桶的 URI。
    • COMPOSER_1_ENV 替换为 Cloud Composer 1 环境的名称。
    • COMPOSER_1_LOCATION 替换为 Cloud Composer 1 环境所在的区域。
  5. 将变量和池导入 Cloud Composer 2:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 检查已导入的变量和池:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools list
    
  7. 从存储分区中移除 JSON 文件:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    

第 7 步:从 Cloud Composer 1 环境的存储桶转移其他数据

从 Cloud Composer 1 环境的存储桶转移插件和其他数据。

gcloud

  1. 插件转移到 Cloud Composer 2 环境。为此,请将插件从 Cloud Composer 1 环境的存储桶导出到 Cloud Composer 2 环境存储桶的 /plugins 文件夹中:

    gcloud composer environments storage plugins export \
        --destination=COMPOSER_2_BUCKET/plugins \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  2. 检查 /plugins 文件夹是否已成功导入:

    gcloud composer environments storage plugins list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
  3. /data 文件夹从 Cloud Composer 1 环境导出到 Airflow 2 环境:

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  4. 检查 /data 文件夹是否已成功导入:

    gcloud composer environments storage data list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    

第 8 步:转移连接

Airflow 1.10.15 不支持导出连接。如需转移连接,请在 Cloud Composer 2 环境中通过 Cloud Composer 1 环境手动创建连接。

gcloud

  1. 如需获取 Cloud Composer 1 环境中的连接列表,请运行以下命令:

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         connections -- --list
    
  2. 如需在 Cloud Composer 2 环境中创建新连接,请通过 gcloud 运行 connections Airflow CLI 命令。例如:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    

第 9 步:转移用户账号

此步骤说明如何通过手动创建用户来转移用户。

Airflow 1.10.15 不支持导出用户。如需转移用户和连接,请在 Airflow 2 环境中通过 Cloud Composer 1 环境手动创建新的用户账号。

Airflow 界面

  1. 如需查看 Cloud Composer 1 环境中的用户列表,请执行以下操作:

    1. 打开 Cloud Composer 1 环境的 Airflow 网页界面

    2. 转到管理 > 用户

  2. 如需在 Cloud Composer 2 环境中创建用户,请执行以下操作:

    1. 打开 Cloud Composer 2 环境的 Airflow 网页界面

    2. 转到安全性 > 列出用户

    3. 点击添加新记录

gcloud

  1. 无法在 Airflow 1 中通过 gcloud 查看用户列表。请使用 Airflow 界面。

  2. 如需在 Cloud Composer 2 环境中创建新的用户账号,请运行 users create Airflow CLI 命令gcloud。例如:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Op
    

    替换:

    • COMPOSER_2_ENV 替换为 Cloud Composer 2 环境的名称。
    • COMPOSER_2_LOCATION 替换为 Cloud Composer 2 环境所在的区域。
    • Cloud Composer 1 环境中的所有用户配置参数及其值,包括用户的角色。

第 10 步:确保您的 DAG 已准备好使用 Airflow 2

将 DAG 转移到 Cloud Composer 1 环境之前,请确保:

  1. 您的 DAG 已成功运行,没有剩余的兼容性问题。

  2. 您的 DAG 会使用正确的导入语句

    例如,BigQueryCreateDataTransferOperator 的新导入语句可能如下所示:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已针对 Airflow 2 升级。此变更与 Airflow 1.10.14 及更高版本兼容。

第 11 步:将 DAG 转移到 Cloud Composer 2 环境

在环境之间转移 DAG 时,可能会出现以下问题:

  • 如果两个环境中都启用了(未暂停)某个 DAG,则每个环境都会安排自己的 DAG 副本。这可能会导致针对相同的数据和执行时间重复运行 DAG。

  • 由于 DAG 同步,Airflow 从 DAG 中指定的开始日期开始安排额外的 DAG 运行。之所以发生这种情况,是因为新的 Airflow 实例不考虑从 Cloud Composer 1 环境运行 DAG 的历史记录。这可能会导致从指定的开始日期开始安排大量 DAG 运行。

防止重复的 DAG 运行

在 Cloud Composer 2 环境中或者在 Airflow 2 环境中为 dags_are_paused_at_creation 选项添加 Airflow 配置选项替换。进行此项更改后,默认情况下,所有新 DAG 都会暂停。

部分
core dags_are_paused_at_creation True

防止额外运行或缺失的 DAG 运行

为避免执行日期出现间隔和重叠,请在 Cloud Composer 2 中停用同步。通过这种方式,您在将 DAG 上传到 Cloud Composer 2 环境后,Airflow 不会安排已在 Cloud Composer 1 环境中进行的 DAG 运行作业。为 catchup_by_default 选项添加 Airflow 配置选项替换

部分
scheduler catchup_by_default False

将 DAG 转移到 Cloud Composer 2 环境

如需将 DAG 转移到 Cloud Composer 2 环境,请执行以下操作:

  1. 将 DAG 从 Cloud Composer 1 环境上传到 Cloud Composer 2 环境。跳过 airflow_monitoring.py DAG。

  2. 由于配置替换,DAG 在 Cloud Composer 2 环境中被暂停,因此未安排任何 DAG 运行。

  3. Airflow 网页界面中,转到 DAG 并查看报告的 DAG 语法错误。

  4. 当您计划转移 DAG 时,请执行以下操作:

    1. 在 Cloud Composer 1 环境中暂停 DAG。

    2. 在 Cloud Composer 2 环境中取消暂停 DAG。

    3. 检查新的 DAG 运行是否安排在正确的时间。

    4. 等待 DAG 在 Cloud Composer 2 环境中运行并检查运行是否成功。如果 DAG 运行成功,请不要在 Cloud Composer 1 环境中取消暂停该 DAG。如果这样做,则 DAG 会在 Cloud Composer 1 环境中以相同的时间和日期运行。

  5. 如果特定 DAG 运行失败,请尝试对该 DAG 进行问题排查,直到它在 Cloud Composer 2 中成功运行为止。

    如果需要,您可以随时回退到 DAG 的 Cloud Composer 1 版本,并从 Cloud Composer 1 环境执行在 Cloud Composer 2 中失败的 DAG 运行:

    1. 在 Cloud Composer 2 环境中暂停 DAG。

    2. 在 Cloud Composer 1 环境中取消暂停 DAG。这安排了 DAG 在 Cloud Composer 1 环境中暂停时的 DAG 运行情况。

第 12 步:监控 Cloud Composer 2 环境

将所有 DAG 和配置转移到 Cloud Composer 2 环境后,请监控其是否存在潜在问题、DAG 运行失败以及环境整体运行状况。如果 Cloud Composer 2 环境在足够长的时间内正常运行,请考虑删除 Cloud Composer 1 环境。

后续步骤