快速入门

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何在 Cloud Composer 2 中创建 Cloud Composer 环境并运行 Apache Airflow DAG。

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  5. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  6. 启用 Cloud Composer API。

    启用 API

创建环境

控制台

  1. 在 Google Cloud Console 中,转到创建环境页面。

    转到“创建环境”

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。如需了解如何选择区域,请参阅可用区域

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 如需创建此环境,请点击创建

  6. 等待环境创建完成。完成后,相应环境名称旁边会显示一个绿色对勾标记。

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION \
    --image-version IMAGE_VERSION

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。 本快速入门使用的是 example-environment

  • LOCATION 替换为用于 Cloud Composer 环境的某个区域。如需了解如何选择区域,请参阅可用区域

  • IMAGE_VERSION 替换为 Cloud Composer 映像的名称。

例如:

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.0.0-preview.3-airflow-2.1.2

Terraform

要使用 Terraform 配置此环境,请将以下资源块添加到 Terraform 配置并运行 terraform apply

如需详细了解如何使用 Terraform 创建 Cloud Composer 环境,请参阅 Terraform 文档

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。

  • LOCATION 替换为环境的区域。

    位置是指 Cloud Composer 环境所使用的一个区域。如需了解如何选择区域,请参阅可用区域

  • IMAGE_VERSION 替换为 Cloud Composer 映像的名称。

例如:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.0.0-preview.3-airflow-2.1.2"
    }
  }

}

查看环境详情

创建完环境后,您可以查看环境的信息,例如 Cloud Composer 版本、Airflow 网页界面的网址,以及 Cloud Storage 中的 DAGs 文件夹。

如需查看环境信息,请执行以下操作:

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 如需查看环境详情页面,请点击您的环境名称 example-environment

创建 DAG

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

quickstart.py 中的 Python 代码执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

如需创建 DAG,请在本地机器上创建 quickstart.py 文件的副本。

import datetime

import airflow
from airflow.operators import bash

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

将 DAG 上传到 Cloud Storage

Cloud Composer 只会安排位于环境的 Cloud Storage 存储桶所含 /dags 文件夹中的 DAG。

如需安排 DAG,请将 quickstart.py 从本地机器上传到您的环境的 /dags 文件夹。

控制台

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 如需打开 /dags 文件夹,请访问 example-environmentDAGs 文件夹链接。

  3. 点击“存储分区详情”页面上的上传文件,然后选择 quickstart.py 的本地副本。

  4. 如需上传该文件,请点击打开

    上传您的 DAG 之后,Cloud Composer 会将该 DAG 添加到 Airflow 中并立即安排一个 DAG 运行。该 DAG 可能要过几分钟才能显示在 Airflow 网页界面中。

gcloud

要使用 gcloud 上传 quickstart.py,请运行以下命令:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

在 Airflow 网页界面中查看 DAG

每个 Cloud Composer 环境都有一个 Web 服务器,用于运行 Airflow 网页界面。您可以通过 Airflow 网页界面管理 DAG。

如需在 Airflow 网页界面中查看 DAG,请执行以下操作:

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 如需打开 Airflow 网页界面,请点击 example-environmentAirflow 链接。Airflow 界面会在新的浏览器窗口中打开。

  3. 在 Airflow 工具栏中,转到 DAG 页面。

  4. 如需打开 DAG 详情页面,请点击 composer_sample_dag

    DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。

在 Airflow 日志中查看任务实例详情

您安排的 DAG 包含 print_dag_run_conf 任务。该任务会输出 DAG 运行的配置,您可以在任务实例的 Airflow 日志中查看这项配置。

如需查看任务实例详情,请执行以下操作:

  1. 在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View

    如果将鼠标指针悬停在 print_dag_run_conf 任务上,系统会显示该任务的状态。

  2. 点击 print_dag_run_conf 任务。

    任务实例上下文菜单中,您可以获取元数据,并执行某些操作。

  3. 任务实例上下文菜单中,点击 View Log

  4. 在日志中,查找 Running command: ['bash' 以查看 bash 运算符的输出。

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

清理

为避免系统因本页中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。

从下列选项中选择一项:

  • 最为简单的清理方法是删除您为快速入门创建的项目。
  • 您还可以删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 如果您打算删除的项目已附加到某个组织,请展开名称列中的组织列表。
  3. 在项目列表中,选择要删除的项目,然后点击删除
  4. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

您可以删除本教程中使用的资源,而不必删除项目:

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud Console 中,转到环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储分区。

    1. 在 Google Cloud Console 中,转到存储 > 浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud Console 中,转到 Compute Engine > 磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除。例如,此磁盘可以命名为 pvc-02bc4842-2312-4347-8519-d87bdcd31115。此类磁盘始终为 Balanced persistent disk 类型,大小为 2 GB。

后续步骤