使用 Cloud Composer 实现基础架构自动化

本教程演示了一种使用 Cloud Composer 实现云基础架构自动化的方法。本示例介绍了如何安排 Compute Engine 虚拟机实例的自动备份。

Cloud Composer 是 Google Cloud 上的一项全代管式工作流编排服务。借助 Cloud Composer,您可以使用 Python API 编写工作流,安排它们自动运行或手动启动它们,并通过图形界面实时监控任务的执行情况。

Cloud Composer 以 Apache Airflow 为基础开发。Google 在 Google Kubernetes Engine (GKE) 集群之上运行此开源编排平台。该集群负责管理 Airflow 工作器,并提供大量与其他 Google Cloud 产品的集成机会。

本教程适用于对自动化基础架构感兴趣并希望深入了解 Cloud Composer 核心功能的操作员、IT 管理员和开发者。本教程不是企业级灾难恢复 (DR) 指南,也不是备份的最佳做法指南。如需了解如何为企业创建灾难恢复计划的详细信息,请参阅灾难恢复计划指南

定义架构

Cloud Composer 工作流通过创建有向无环图 (DAG) 来定义。从 Airflow 的角度来看,DAG 是为了反映其定向相互依赖性而组织起来的任务集合。在本教程中,您将学习如何定义一个定期运行并使用永久性磁盘快照备份 Compute Engine 虚拟机实例的 Airflow 工作流。

本示例中使用的 Compute Engine 虚拟机包含一个具有关联的永久性启动磁盘的实例。Cloud Composer 备份工作流遵循稍后描述的快照准则调用 Compute Engine API 以停止实例、获取永久性磁盘的快照,并重启实例。在这些任务之间,工作流等待每项操作完成再进行后续操作。

下图总结了其基础架构:

实现基础架构自动化的架构

在开始学习本教程之前,下一部分将向您介绍如何创建一个 Cloud Composer 环境。此环境的优点在于它可以使用多个 Google Cloud 产品,而且您不必单独配置每个产品。

  • Cloud Storage:Airflow DAG、插件和日志存储在 Cloud Storage 存储分区中。
  • Google Kubernetes Engine:Airflow 平台基于微服务架构,适合在 GKE 中运行。
    • Airflow 工作器使用 Compute Engine API 从 Cloud Storage 加载插件和工作流定义并运行每个任务。
    • Airflow 调度程序确保根据配置的节奏和正确的任务顺序执行备份。
    • Redis 用作 Airflow 组件之间的消息代理。
    • Cloud SQL Proxy 用于与元数据代码库进行通信。
  • Cloud SQL 和 App Engine Flex:Cloud Composer 还使用 Cloud SQL 实例储存元数据,并使用 App Engine Flex 应用提供 Airflow UI。图中未显示这些资源,因为它们属于 Google 管理的另一项目中。

如需了解更多详情,请参阅 Cloud Composer 概述

扩展工作流

本教程中介绍的用例很简单:根据固定的时间表截取单个虚拟机的快照。但是,真实场景中可能包含数百个属于组织的不同部分或系统的不同层的虚拟机,并且每个虚拟机都需要不同的备份计划。扩展不仅适用于我们使用 Compute Engine 虚拟机的示例,也适用于需要运行调度进程的任何基础架构组件。

Cloud Composer 擅长应对这些复杂的场景,因为它是基于云端托管的 Apache Airflow 的完整工作流引擎,而不仅仅是 Cloud Schedulercron 的替代方案。

Airflow DAG 是工作流的灵活表示方式,可以在从单个代码库运行的同时适应复杂的实际需求。要构建适合您用例的 DAG,您可以结合使用以下两种方法:

  • 为一组基础架构组件创建单个 DAG 实例,其中使用相同的时间表来启动进程。
  • 为需要单独时间表的一组基础架构组件组创建多个独立的 DAG 实例。

DAG 可以并行处理组件。任务必须为每个组件启动异步操作,或者您必须创建一个分支来处理各个组件。您可以从代码动态构建 DAG,以根据需要添加或删除分支和任务。

此外,您可以对同一 DAG 中各应用层之间的依赖关系建模。例如:您可能希望在停止任何应用服务器实例之前停止所有网络服务器实例。

这些优化超出了当前教程的探讨范围,因此不再深入讲解。

使用快照的最佳做法

Persistent Disk 是耐用的块存储设备,可以挂接到虚拟机实例,用作实例的主启动磁盘或关键数据的辅助非启动磁盘。永久性磁盘可用性高,因为对于每次写入,都会写入三个副本,但 Google Cloud 客户只需为其中一个付费。

快照是永久性磁盘在给定时间点的精确副本。快照经过增量和压缩,并以透明方式存储在 Cloud Storage 中。

在应用运行时,可以截取任何永久性磁盘的快照。快照不会包含部分写入的块。但如果后端收到快照创建请求时,跨多个块的写入操作正在进行中,则该快照可能只包含部分已更新的块。对于这些不一致问题,通过与解决非正常关停问题相同的方式处理即可。

我们建议您遵循以下准则以确保快照一致:

  • 在快照创建过程中尽量减少或避免磁盘写入。例如,安排在非高峰时段进行备份。
  • 对于辅助非启动磁盘,可暂停写入数据的应用和进程并冻结或卸载文件系统。
  • 对于启动磁盘,冻结根卷是不安全或不可行的。因此,最好在截取快照之前停止虚拟机实例。

    为避免因冻结或停止虚拟机而导致服务关停,建议使用高可用性架构。如需了解更多信息,请参阅应用的灾难恢复方案

  • 为快照使用一致的命名惯例。例如,使用适当粒度的时间戳,并与实例、磁盘和区域的名称串联。

如需了解创建一致快照的更多信息,请参阅快照最佳做法

目标

  • 为 Compute Engine 创建自定义 Airflow 操作和传感器。
  • 使用 Airflow 操作和传感器创建 Cloud Composer 工作流。
  • 安排工作流以定期备份 Compute Engine 实例。

费用

您可使用价格计算器根据您的预计用量来估算费用。

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 创建一个 Cloud Composer 环境。要最大限度地降低费用,请选择 20GB 的磁盘大小。

    转到“创建环境”页面

    预配 Cloud Composer 环境通常需要约 15 分钟,但最多可能需要一个小时。

  5. GitHub 上提供了本教程的完整代码。要在后续操作中检查文件,请打开 Cloud Shell 中的代码库:

    转到 Cloud Shell
  6. 在 Cloud Shell 控制台主目录中,运行以下命令:
    git clone https://github.com/GoogleCloudPlatform/composer-infra-python.git

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

设置示例 Compute Engine 实例

第一步是创建要备份的示例 Compute Engine 虚拟机实例。该实例运行 WordPress,一个开源内容管理系统。

按照以下步骤在 Compute Engine 上创建该 WordPress 实例:

  1. 在 Google Cloud Marketplace 中,转到 Bitnami 认证的 WordPress 启动页面。
  2. 点击启动
  3. 此时将出现一个显示您的项目列表的弹出窗口。选择您之前为本教程创建的项目。

    Google Cloud 会配置项目所需的 API,短暂的等待后,它会显示一个屏幕,其中显示您的 WordPress Compute Engine 实例的不同配置选项。

  4. (可选)将启动磁盘类型更改为 SSD 以提高实例启动速度。

  5. 点击部署

    之后您将进入部署管理器屏幕,可以在其中查看部署的状态。

    WordPress 部署管理器脚本会创建 WordPress Compute Engine 实例和两项防火墙规则,以允许 TCP 流量通过端口 80 和 443 传输到实例。此过程可能需要几分钟,期间每个项目都会被部署并显示进度轮图标。

    完成此过程后,您的 WordPress 实例将准备就绪并开始提供网站网址上的默认内容。然后,您可以在 Deployment Manager 屏幕看到网站网址(站点地址)、管理控制台网址(管理员网址)及其用户和密码、文档链接以及建议的后续步骤。

    Deployment Manager 显示已部署的实例

  6. 点击站点地址以验证您的 WordPress 实例已启动并正在运行。此时您应该看到一个默认的 WordPress 博客页面。

现在,示例 Compute Engine 实例已准备就绪。下一步是配置该实例的永久性磁盘的自动增量备份过程。

创建自定义 Airflow 操作

要备份测试实例的永久性磁盘,可以创建一个 Airflow 工作流,用于停止实例、获取其永久性磁盘的快照,然后重启实例。这些任务中的每一个都被定义为具有自定义 Airflow 操作的代码。接着,操作的代码会被分组到 Airflow 插件中。

在本部分中,您将学习如何构建调用 Compute Engine Python 客户端库以控制实例生命周期的自定义 Airflow 操作。您也可以选择其他方案来完成此操作,例如:

  • 使用 Airflow BashOperator 执行 gcloud compute 命令
  • 使用 Airflow HTTPOperator 直接对 Compute Engine REST API 执行 HTTP 调用。
  • 使用 Airflow PythonOperator 调用任意 Python 函数,而无需定义自定义操作。

请注意,本教程不会探讨这些替代方案。

授权调用 Compute Engine API

您在本教程中创建的自定义操作使用 Python 客户端库来调用 Compute Engine API。对 API 的请求必须进行身份验证和授权。因此,建议使用名为应用默认凭据 (ADC) 的策略。

无论何时从客户端库进行调用,都可应用 ADC 策略:

  1. 库将验证环境变量 GOOGLE_APPLICATION_CREDENTIALS 中是否指定了服务帐号。
  2. 如果未指定服务帐号,则库将使用 Compute Engine 或 GKE 提供的默认服务帐号。

如果这两种方法失败,则会发生错误。

本教程中的 Airflow 操作属于第二种方法。创建 Cloud Composer 环境时,会配置一个 GKE 集群。此集群的节点在 Airflow 工作器 pod 上运行。因此,这些工作器使用您定义的自定义操作执行工作流。由于您在创建环境时未指定服务帐号,因此 ADC 策略会使用 GKE 集群节点的默认服务帐号

GKE 集群节点属于 Compute Engine 实例,因此,在操作代码中获取与 Compute Engine 默认服务帐号关联的凭据非常简单。

def get_compute_api_client(self):
  credentials = GoogleCredentials.get_application_default()
  return googleapiclient.discovery.build(
      'compute', 'v1', cache_discovery=False, credentials=credentials)

此代码使用默认应用凭据创建将向 Compute Engine API 发送请求的 Python 客户端。在以下部分中,您将在创建每个 Airflow 操作时引用此代码。

作为使用默认 Compute Engine 服务帐号的替代方案,可以创建一个服务帐号并将其配置为与 Airflow 管理控制台相连接。管理 Airflow 连接页面中描述了此方法,它允许对 Google Cloud 资源进行更精细的访问控制。但请注意,本教程不会探讨此替代方案。

安全关停 Compute Engine 实例

本部分分析如何创建第一个自定义 Airflow 操作:StopInstanceOperator。此操作调用 Compute Engine API 以停止正在运行 WordPress 的 Compute Engine 实例:

  1. 在 Cloud Shell 中,使用文本编辑器(如 nano 或 vim)打开 gce_commands_plugin.py 文件:

    vi $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py
    
  2. 检查文件开始的导入部分:

    import datetime
    import logging
    import time
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    import googleapiclient.discovery
    from oauth2client.client import GoogleCredentials

    值得注意的导入有:

    • BaseOperator:所有 Airflow 自定义操作都需要继承的基类。
    • AirflowPlugin:用于创建一组操作以形成一个插件的基类。
    • apply_defaults:未在操作构造函数中指定参数时,使用默认值填充参数的函数修饰符。
    • GoogleCredentials:用于检索应用默认凭据的类。
    • googleapiclient.discovery:可发现底层 Google API 的客户端库入口点。在本例中,客户端库构建一个资源以与 Compute Engine API 进行交互。
  3. 接下来,查看导入部分下方的 StopInstanceOperator 类:

    class StopInstanceOperator(BaseOperator):
      """Stops the virtual machine instance."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        super(StopInstanceOperator, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def execute(self, context):
        logging.info('Stopping instance %s in project %s and zone %s',
                     self.instance, self.project, self.zone)
        self.compute.instances().stop(
            project=self.project, zone=self.zone, instance=self.instance).execute()
        time.sleep(90)

    StopInstanceOperator 类有三个方法:

    • __init__:类构造函数。接收项目名称、正在运行实例的区域以及要停止的实例的名称。此外,它通过调用 get_compute_api_client 初始化 self.compute 变量。
    • get_compute_api_client:返回 Compute Engine API 实例的辅助方法。它使用 GoogleCredentials 提供的 ADC 对 API 进行身份验证并授权后续调用。
    • execute:从 BaseOperator 覆写的主操作方法。Airflow 调用此方法来运行操作。该方法将信息消息打印到日志,然后调用 Compute Engine API 以停止由构造函数中接收的三个参数指定的 Compute Engine 实例。最后的 sleep() 函数等到实例停止。在生产环境中,您必须使用更具确定性的方法,如操作交叉通信。本教程后面会介绍该方法。

Compute Engine API 中的 stop() 方法可正常关停虚拟机实例。具体来说,操作系统执行 init.d 关停脚本,包括位于 /etc/init.d/bitnami 的针对 WordPress 的脚本。当虚拟机再次启动时,此脚本还会处理 WordPress 的启动。您可以使用 /etc/systemd/system/bitnami.service. 上的关停和启动配置检查服务定义。

创建唯一命名的增量备份快照

本部分将创建第二个自定义操作:SnapshotDiskOperator。此操作可获取实例的永久性磁盘的快照。

在上一部分中打开的 gce_commands_plugin.py 文件中,查看 SnapshotDiskOperator 类:

class SnapshotDiskOperator(BaseOperator):
  """Takes a snapshot of a persistent disk."""

  @apply_defaults
  def __init__(self, project, zone, instance, disk, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    self.disk = disk
    super(SnapshotDiskOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def generate_snapshot_name(self, instance):
    # Snapshot name must match regex '(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'
    return ('' + self.instance + '-' +
            datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))

  def execute(self, context):
    snapshot_name = self.generate_snapshot_name(self.instance)
    logging.info(
        ("Creating snapshot '%s' from: {disk=%s, instance=%s, project=%s, "
         "zone=%s}"),
        snapshot_name, self.disk, self.instance, self.project, self.zone)
    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

SnapshotDiskOperator 类具有以下方法:

  • __init__:类构造函数。与 StopInstanceOperator 类中的构造函数类似,但此构造函数除了接收项目、区域和实例名称之外,还接收创建快照的磁盘的名称。这是因为一个实例可以挂接多个永久性磁盘
  • generate_snapshot_name:此示例方法使用实例名称、日期和时间(粒度为一秒),为每个快照创建一个简单的唯一名称。当然,您也可根据需要调整名称,例如,在多个磁盘挂接到一个实例时,可添加磁盘名称,或者通过增加时间粒度来支持临时快照创建请求。
  • execute:从 BaseOperator 覆写的主操作方法。Airflow 工作器执行该方法时,它使用 generate_snapshot_name 方法生成快照名称。然后,它会打印一条信息消息并调用 Compute Engine API 以使用构造函数中收到的参数创建快照。

启动 Compute Engine 实例

在本部分中,您将创建第三个,也是最后一个自定义操作:StartInstanceOperator。此操作可重新启动 Compute Engine 实例。

在之前打开的 gce_commands_plugin.py 文件中,查看文件末尾的 SnapshotDiskOperator 类:

class StartInstanceOperator(BaseOperator):
  """Starts a virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StartInstanceOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def execute(self, context):
    logging.info('Starting instance %s in project %s and zone %s',
                 self.instance, self.project, self.zone)
    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

StartInstanceOperator 类具有以下方法:

  • __init__:类构造函数。与 StopInstanceOperator 类中的构造函数类似。
  • execute:从 BaseOperator 覆写的主操作方法。与先前操作的不同之处在于调取适当的 Compute Engine API 以启动构造函数输入参数中指示的实例。

定义 Airflow 工作流

之前,您定义了一个包含三个操作的 Airflow 插件。这些操作定义了构成 Airflow 工作流一部分的任务。此处介绍的工作流简单而线性,但 Airflow 工作流可以是复杂的有向无环图。

此部分将创建公开三个操作的插件类,然后使用这些操作创建 DAG,将 DAG 部署到 Cloud Composer,最后运行 DAG。

创建插件

到目前为止,gce_commands_plugin.py 文件包含启动、快照和停止操作。要在工作流中使用这些操作,必须将它们包含在插件类中。

  1. 请注意 gce_commands_plugin.py 文件末尾的 GoogleComputeEnginePlugin 类:

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator]

    该类继承自 AirflowPlugin,为插件提供内部名称 gce_commands_plugin,并将三个操作添加到其中。

  2. 关闭 gce_commands_plugin.py 文件。

配置有向无环图

DAG 定义了 Airflow 要执行的工作流。要让 DAG 知道要备份哪个磁盘,需要定义一些变量,例如磁盘所挂接的 Compute Engine 实例、运行实例的区域以及提供所有资源的项目。

您可以将这些变量硬编码入 DAG 源代码中,但最佳做法是将它们定义为 Airflow 变量。这样,可以独立于代码部署集中管理任何配置更改。

定义 DAG 配置:

  1. 在 Cloud Shell 中,设置 Cloud Composer 环境的位置:

    LOCATION=[CLOUD_ENV_LOCATION]
    

    该位置是 Cloud Composer 环境所在的 Compute Engine 地区,例如 us-central1europe-west1。它是在创建环境时设置的,可在 Cloud Composer 控制台页面中找到。

  2. 设置 Cloud Composer 环境名称:

    ENVIRONMENT=$(gcloud composer environments list \
        --format="value(name)" --locations $LOCATION)
    

    --format 参数用于仅从生成的表中选择 name 列。您可以假设只创建了一个环境。

  3. 使用当前 Google Cloud 项目的名称创建 Airflow 中的 PROJECT 变量:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set PROJECT $(gcloud config get-value project)
    

    其中:

    • gcloud composer environments run 用于运行 Airflow CLI 命令。
    • variables Airflow 命令将 PROJECT Airflow 变量设置为 gcloud config 返回的值
  4. 使用 WordPress 实例的名称创建 Airflow 中的 INSTANCE 变量:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set INSTANCE \
        $(gcloud compute instances list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    

    此命令使用 --filter 参数仅选择 name 与包含字符串 wordpress 的正则表达式匹配的实例。这种方法假设只有一个这样的实例,并且“wordpress”是您的实例和磁盘名称的一部分(如果您接受了默认值,则符合真实情况)。

  5. 使用 WordPress 实例的区域创建 Airflow 中的 ZONE 变量:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set ZONE \
        $(gcloud compute instances list \
        --format="value(zone)" --filter="name~'.*wordpress.*'")
    
  6. 使用挂接到 WordPress 实例的永久性磁盘的名称在 Airflow 中创建 DISK 变量:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set DISK \
        $(gcloud compute disks list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    
  7. 验证是否已正确创建 Airflow 变量:

    1. 在 Cloud Console 中,转到 Cloud Composer 页面。

      转到 Cloud Composer 页面

    2. Airflow 网络服务器列,点击 Airflow 链接。此时将打开一个显示 Airflow 网络服务器主页面的新标签页。

    3. 点击 Admin,然后点击 Variables

      该列表显示了 DAG 配置变量。

      DAG 配置变量

创建有向无环图

DAG 定义位于专门的 Python 文件中。因此,您的下一步是创建 DAG,从插件中链接三个操作。

  1. 在 Cloud Shell 中,使用文本编辑器(如 nano 或 vim)打开 backup_vm_instance.py 文件:

    vi $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py
    
  2. 检查文件开始的导入部分:

    import datetime
    from airflow import DAG
    from airflow.models import Variable
    from airflow.operators import SnapshotDiskOperator
    from airflow.operators import StartInstanceOperator
    from airflow.operators import StopInstanceOperator
    from airflow.operators.dummy_operator import DummyOperator

    简单介绍一下这些导入:

    • DAG 是 Airflow 定义的有向无环图类
    • DummyOperator 用于创建开始和结束的无运维操作,以改进工作流可视化。在更复杂的 DAG 中,DummyOperator 可用于联接分支和创建 SubDAG
    • DAG 使用您在前面部分中定义的三个操作。
  3. 定义要传递给操作构造函数的参数的值:

    INTERVAL = '@daily'
    START_DATE = datetime.datetime(2018, 7, 16)
    PROJECT = Variable.get('PROJECT')
    ZONE = Variable.get('ZONE')
    INSTANCE = Variable.get('INSTANCE')
    DISK = Variable.get('DISK')

    其中:

    • INTERVAL 定义备份工作流的运行频率。上述代码使用 Airflow cron 预设指定一项每日重复任务。如果要使用其他间隔期,请参阅 DAG 运行参考页面。此外,您也可以独立于此时间表手动触发工作流。
    • START_DATE 定义了计划启动备份的时间点。该值无需更改。
    • 其余值将从您在上一部分中配置的 Airflow 变量中检索。
  4. 使用以下代码创建具有先前定义的部分参数的 DAG。此代码还可为 DAG 提供名称和描述,两者都显示在 Cloud Composer 界面中。

    dag1 = DAG('backup_vm_instance',
               description='Backup a Compute Engine instance using an Airflow DAG',
               schedule_interval=INTERVAL,
               start_date=START_DATE,
               catchup=False)
  5. 用任务(操作实例)填充 DAG:

    ## Dummy tasks
    begin = DummyOperator(task_id='begin', retries=1, dag=dag1)
    end = DummyOperator(task_id='end', retries=1)
    
    ## Compute Engine tasks
    stop_instance = StopInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
    snapshot_disk = SnapshotDiskOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        disk=DISK, task_id='snapshot_disk')
    start_instance = StartInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='start_instance')

    此代码将工作流所需的所有任务实例化,将定义的参数传递给对应的操作构造函数。

    • task_id 值是将在 Cloud Composer 界面中显示的唯一 ID。稍后您将使用这些 ID 在任务间传递数据。
    • retries 设置在任务失败前重试任务的次数。对于 DummyOperator 任务,可忽略这些值。
    • dag=dag 表示任务被挂接到先前创建的 DAG。仅工作流的第一项任务需要此参数。
  6. 定义组成工作流 DAG 的任务序列:

    # Airflow DAG definition
    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    
  7. 关闭 gce_commands_plugin.py 文件。

运行工作流

现在,操作 DAG 所代表的工作流已准备好由 Cloud Composer 运行。Cloud Composer 从关联的 Cloud Storage 存储分区中读取 DAG 和插件定义。当您在创建 Cloud Composer 环境时,系统已自动创建此存储分区及对应的 dagsplugins 目录。

可使用 Cloud Shell 将 DAG 和插件复制到关联的 Cloud Storage 存储分区中:

  1. 在 Cloud Shell 中,获取存储分区名称:

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    您应该会看到一个存储分区,其名称采用以下格式:gs://[REGION]-{ENVIRONMENT_NAME]-{ID}-bucket/.

  2. 执行以下脚本将 DAG 和插件文件复制到对应的存储分区目录中:

    gsutil cp $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    存储分区名称中已包含尾部斜杠,因此 $BUCKET 变量前后有双引号。

  3. 在 Cloud Console 中,转到 Cloud Composer 页面。

    转到 Cloud Composer 页面

  4. Airflow 网络服务器列,点击 Airflow 链接。此时将打开一个显示 Airflow 网络服务器主页面的新标签页。等待两到三分钟,然后重新加载该页面。可能要经过几轮的等待和重新加载,页面才能准备就绪。

    此时将出现一个列表,其中显示了新创建的 DAG,类似以下内容:

    DAG 列表

    如果代码中存在语法错误,则 DAG 表的顶端会显示一条消息。如果存在运行时错误,则 DAG 运行下方会标记这些错误。在继续操作前,请修正所有错误。最简单的方法是将文件从 GitHub 代码库重新复制到存储分区中。

  5. 如需查看更详细的堆栈轨迹,请在 Cloud Shell 中运行以下命令:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION list_dags
    
  6. Airflow 立即开始运行工作流,并显示在 Dag 运行列下。

    工作流已在进行中,但如果需要再次运行,可通过以下步骤手动触发:

    1. 链接列,点击上一个屏幕截图中标有箭头的第一个图标触发 Dag
    2. 您确定吗?的确认弹出窗口中,点击确定

      几秒钟后,工作流开始运行,新的运行将在 DAG 运行下以浅绿色圆圈显示。

  7. 链接列,点击在上一屏幕截图中以箭头标记的“图表视图”图标。

    Cloud Composer 的屏幕截图

    “图表视图”中将显示工作流,其中成功执行的任务带有深绿色边框,正在执行的任务带有浅绿色边框,待处理任务没有边框。您可以点击任务以查看日志、查看详细信息和执行其他操作。

  8. 要跟踪执行过程,请定期点击右上角的刷新按钮。

    祝贺您!您完成了第一次 Cloud Composer 工作流运行。工作流完成后,它会创建一个 Compute Engine 实例永久性磁盘的快照。

  9. 在 Cloud Shell 中,验证是否已创建快照:

    gcloud compute snapshots list
    

    或者,您可以使用 Cloud Console 菜单转到 Compute Engine 快照页面。

    “Compute Engine 快照”页面

此时应该可以看到一个快照。后续的工作流运行(手动触发或按照指定的时间表自动触发)将创建更多快照。

快照具有增量性。第一个快照的体积最大,因为它包含压缩格式的永久性磁盘中的所有块。后续快照中仅包含前一快照中有更改的块,以及对未更改块的任何引用。因此,后续快照比首个快照体积更小,生成时间更短,费用更低。

如果删除快照,其数据将转移到对应的下一快照中,以保持快照链中储存的连续增量的一致性。仅当删除所有快照时,才会删除永久性磁盘中的所有备份数据。

创建自定义 Airflow 传感器

运行工作流时,您可能注意到完成每个步骤都需要一些时间。之所以需要等待,是因为操作末尾包含一条 sleep() 指令,以便在开始下一个任务之前为 Compute Engine API 完成其工作提供时间。

然而,这种方法并不是最优的,并可能导致意外的问题。例如,在创建快照时,增量快照的等待时间可能太长,这意味着浪费时间等待已完成的任务。另外,等待时间也可能太短,这可能导致整个工作流失败或由于实例未完全停止或机器启动时快照进程尚未完成而产生不可靠的结果。

因此,您需要一个机制,让下一项任务知道前面的任务已经完成。一种解决方案是使用 Airflow 传感器,它可暂停工作流,直到满足某些标准。在这种情况下,标准是之前的 Compute Engine 操作成功完成。

跨任务共享交叉通信数据

当任务间需要通信时,Airflow 提供一种称为 XCom 或“交叉通信”的机制。XCom 允许任务间交换由密钥、值和时间戳组成的消息。

使用 XCom 传送消息的最简单方法是让操作从其 execute() 方法返回一个值。该值可以是 Python 使用 pickle 模块序列化的任何对象。

前面部分中介绍的三个操作可调用 Compute Engine API。所有这些 API 调用会返回一个 Operation 资源对象。这些对象用于管理异步请求,如 Airflow 操作上的异步请求。每个对象都有一个 name 字段,您可以使用该字段轮询 Compute Engine 操作的最新状态。

修改操作以返回 Operation 资源对象的 name

  1. 在 Cloud Shell 中,使用文本编辑器(如 nano 或 vim)打开 gce_commands_plugin.py 文件,这次是从 sensor/plugins 目录打开:

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    
  2. StopInstanceOperator 的执行方法中,请注意以下代码如何:

    self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(90)

    替换为此代码:

    operation = self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']

    其中:

    • 第一行将 API 调用的返回值捕获到 operation 变量中。
    • 第二行从 execute() 方法返回操作 name 字段。该指令使用 pickle 将名称序列化,并将其推送到 XCom 任务内共享空间。稍后将以后进先出顺序提取该值。

    如果任务需要推送多个值,则可以通过直接调用 xcom_push() 而不是返回值来为 XCom 提供显式 key

  3. 同样的,在 SnapshotDiskOperator 的执行方法中,请注意以下代码如何:

    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

    替换为此代码:

    operation = self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    return operation['name']

    此代码中有两个不相关的名称。第一个是快照名称,第二个是操作名称。

  4. 最后,在 StartInstanceOperator 的执行方法中,请注意以下代码如何:

    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

    替换为此代码:

    operation = self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']
  5. 此时,在整个 gce_commands_plugin.py 文件中不应该对 sleep() 方法进行任何调用。可通过在文件中搜索 sleep 以确保这一点。否则,请仔细核查本部分中前面的步骤。

    由于没有从代码中调用 sleep(),因此从文件开始的导入部分删除了以下行:

    import time
    
  6. 关闭 gce_commands_plugin.py 文件。

实现并公开传感器

在上一部分中,您修改了每个操作以返回一个 Compute Engine 操作名称。在本部分中,您将使用该操作名称创建一个 Airflow 传感器以轮询 Compute Engine API 每个操作的完成情况。

  1. 在 Cloud Shell 中,使用文本编辑器(如 nano 或 vim)打开 gce_commands_plugin.py 文件,确保使用 sensor/plugins 目录:

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    

    请注意导入部分开始的以下代码行,其位于 from airflow.models import BaseOperator 行的下方:

    from airflow.operators.sensors import BaseSensorOperator
    

    所有传感器都派生自 BaseSensorOperator 类,且必须覆写其 poke() 方法。

  2. 检查新的 OperationStatusSensor 类:

    class OperationStatusSensor(BaseSensorOperator):
      """Waits for a Compute Engine operation to complete."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, prior_task_id, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        self.prior_task_id = prior_task_id
        super(OperationStatusSensor, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def poke(self, context):
        operation_name = context['task_instance'].xcom_pull(
            task_ids=self.prior_task_id)
        result = self.compute.zoneOperations().get(
            project=self.project, zone=self.zone,
            operation=operation_name).execute()
    
        logging.info(
            "Task '%s' current status: '%s'", self.prior_task_id, result['status'])
        if result['status'] == 'DONE':
          return True
        else:
          logging.info("Waiting for task '%s' to complete", self.prior_task_id)
          return False

    OperationStatusSensor 类具有以下方法:

    • __init__:类构造函数。此构造函数采用与 Operators 类似的参数,但有一个例外:prior_task_id。此参数为上一任务的 ID。
    • poke:从 BaseSensorOperator 覆写的主传感器方法。Airflow 每 60 秒调用一次此方法,直到其返回 True。且仅在这种情况下才允许下游任务运行。

      您可以通过将 poke_interval 参数传递给构造函数来配置这些重试的间隔。此外,您还可以定义一个 timeout。如需了解更多信息,请参阅 BaseSensorOperator API 参考

      在前面的 poke 方法的实现中,第一行是对 xcom_pull() 的调用。此方法获取通过 prior_task_id 识别出的任务的最新 Xcom 值。该值是 Compute Engine 操作的名称,存储在 operation_name 变量中。

      然后,代码执行 zoneOperations.get() 方法,将 operation_name 作为参数传递以获取操作的最新状态。如果状态为 DONE,则 poke() 方法返回 True,否则返回 False。在前一种情况下,下游任务将开始;在后一种情况下,工作流执行保持暂停状态,poke_interval 秒后再次调用 poke() 方法。

  3. 在文件的底部,请注意 GoogleComputeEnginePlugin 类是如何更新,以将 OperationStatusSensor 添加到插件导出的操作列表中:

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators and sensor."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator, OperationStatusSensor]
  4. 关闭 gce_commands_plugin.py 文件。

更新工作流

在插件中创建传感器后,您可以将其添加到工作流中。在本部分中,您将工作流更新为最终状态,其中包含全部三个操作以及介于其间的传感器任务。然后,您将运行并验证更新后的工作流。

  1. 在 Cloud Shell 中,使用文本编辑器(如 nano 或 vim)打开 backup_vm_instance.py 文件,这次是从 sensor/dags 目录打开:

    vi $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py
    
    
  2. 在导入部分,请注意新创建的传感器被导入到 from airflow operators import StartInstanceOperator 行下方:

    from airflow.operators import OperationStatusSensor
    
  3. 检查 ## Wait tasks 注释后面的行

    ## Wait tasks
    wait_for_stop = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='stop_instance', poke_interval=15, task_id='wait_for_stop')
    wait_for_snapshot = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='snapshot_disk', poke_interval=10,
        task_id='wait_for_snapshot')
    wait_for_start = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='start_instance', poke_interval=5, task_id='wait_for_start')

    该代码重复使用 OperationStatusSensor 来定义三个中间“等待任务”。这些任务中的每一个都等待前一操作的完成。以下参数传递给传感器构造函数:

    • 已在文件中定义的 WordPress 实例的 PROJECTZONEINSTANCE
    • prior_task_id:传感器正在等待的任务的 ID。例如,wait_for_stop 任务等待 ID 为 stop_instance 的任务完成。

    • poke_interval:Airflow 在重试调用传感器的 poke() 方法之间应该等待的秒数。换句话说,即验证 prior_task_id 是否已完成的频率。

    • task_id:新创建的等待任务的 ID。

  4. 在文件的底部,请注意以下代码如何:

    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    

    替换为此代码:

    begin >> stop_instance >> wait_for_stop >> snapshot_disk >> wait_for_snapshot \
            >> start_instance >> wait_for_start >> end
    

    这些行定义了完整的备份工作流。

  5. 关闭 backup_vm_instance.py 文件。

现在,您需要从关联的 Cloud Storage 存储分区中复制 DAG 和插件:

  1. 在 Cloud Shell 中,获取存储分区名称:

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    您应该看到一个存储分区,其名称格式为:gs://[REGION]-[ENVIRONMENT_NAME]-[ID]-bucket/.

  2. 执行以下脚本将 DAG 和插件文件复制到对应的存储分区目录中:

    gsutil cp $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    存储分区名称已包含尾部斜杠,因此 $BUCKET 变量前后有双引号

  3. 将更新的工作流上传到 Airflow:

    1. 在 Cloud Console 中,转到 Cloud Composer 页面。

      转到 Cloud Composer 页面

    2. Airflow 列,点击 Airflow 网络服务器链接以显示 Airflow 主页。

    3. 等待两到三分钟,直到 Airflow 自动更新插件和工作流。在此期间,您可能会观察到 DAG 表暂时变空。重新加载页面几次,直到链接部分始终显示即可。

    4. 确保未显示任何错误,然后在链接部分点击树状视图

      Cloud Composer 树状视图页面的屏幕截图 在左侧,工作流以自下而上的树状表示。在右侧,任务的图表针对不同的日期运行。绿色方块表示该特定任务和日期的成功运行。白色方块表示从未运行过的任务。由于您使用新的传感器任务更新了 DAG,因此所有这些任务都以白色显示,Compute Engine 任务则以绿色显示。

    5. 运行更新的备份工作流:

      1. 在顶部菜单中,点击 DAG 以返回主页面。
      2. 链接列中,点击触发 DAG
      3. 您确定吗?确认弹出窗口中,点击确定。此时新的工作流开始运行,并在 DAG 运行列中以浅绿色圆圈表示。
    6. 链接下,点击图形视图图标以实时观察工作流的执行情况。

    7. 点击右侧的刷新按钮以跟进任务执行。请注意工作流如何停止每个传感器任务以等待上一任务完成。等待时间根据每个任务的需要进行调整,而不是依赖于硬编码的休眠值。

    Cloud Composer 任务执行的屏幕截图

  4. (可选)在工作流执行时,返回 Cloud Console,选择 Compute Engine 菜单,然后点击虚拟机实例以查看虚拟机如何关停和重启。另外,您还可以点击快照以查看正在创建的新快照。

您现在已经运行了一个备份工作流,该工作流可以从 Compute Engine 实例创建快照。此快照遵循最佳做法并使用传感器优化流程。

从快照还原实例

生成快照只是备份过程的一部分。另一部分是能从快照恢复实例。

要使用快照创建实例,请执行以下操作:

  1. 在 Cloud Shell 中,获取可用快照的列表:

    gcloud compute snapshots list
    

    输出类似于以下内容:

    NAME                              DISK_SIZE_GB  SRC_DISK                            STATUS
    wordpress-1-vm-2018-07-18-120044  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-120749  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-125138  10            us-central1-c/disks/wordpress-1-vm  READY
    
  2. 选择一个快照并从中创建一个独立的启动永久性磁盘。用自定义值替换括号中的占位符。

    gcloud compute disks create [DISK_NAME] --source-snapshot [SNAPSHOT_NAME] \
        --zone=[ZONE]
    

    其中:

    • DISK_NAME 是新的独立启动永久性磁盘的名称。
    • SNAPSHOT_NAME 是从上一次输出的第一列中选定的快照。
    • ZONE 是将创建新磁盘的计算区域。
  3. 使用启动磁盘创建一个新实例。将 [INSTANCE_NAME] 替换为您要创建的实例的名称。

    gcloud compute instances create [INSTANCE_NAME] --disk name=[DISK_NAME],boot=yes \
        --zone=ZONE --tags=wordpress-1-tcp-443,wordpress-1-tcp-80
    

    使用命令中指定的两个标记,由于已经存在为最初的 WordPress 实例创建的防火墙规则,实例被自动允许接收端口 443 和 80 上的传入流量。

    记下上一个命令返回的新实例的外部 IP。

  4. 验证 WordPress 是否在新创建的实例上运行。在新的浏览器标签页上,导航到外部 IP 地址。此时将显示 WordPress 默认着陆页。

  5. 或者,使用控制台中的快照创建一个实例,方法如下:

    1. 在 Cloud Console 中,转到“快照”页面:

      转到“快照”页面

    2. 点击最新的快照。

    3. 点击创建实例

    4. 新的虚拟机实例表单中,点击管理、安全、磁盘、网络、单独租用,然后点击网络

    5. wordpress-1-tcp-443wordpress-1-tcp-80 添加到网络标记字段,添加每个标记后按 Enter 键。如需了解这些标记的说明,请参见上文。

    6. 点击创建

      此时将创建一个基于最新快照的新实例,并准备好提供内容。

  6. 打开 Compute Engine 实例页面,记下新实例的外部 IP。

  7. 验证 WordPress 是否在新创建的实例上运行。在新的浏览器标签页上导航到外部 IP。

如需了解更多详情,请参阅从快照创建实例

清理

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤