在 Cloud Composer 1 中运行 Apache Airflow DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何在 Cloud Composer 中创建 Cloud Composer 环境和运行 Apache Airflow DAG。

如果您刚接触 Airflow,请参阅此教程详细了解 Airflow 概念、对象及其用法。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 启用 Cloud Composer API。

    启用 API

创建环境

控制台

  1. 在 Google Cloud 控制台中,前往创建环境页面。

    转到“创建环境”

  2. 名称字段中,输入 example-environment

  3. 位置下拉列表中,为 Cloud Composer 环境选择一个区域。如需了解如何选择区域,请参阅可用区域

  4. 对于其他环境配置选项,请使用提供的默认值。

  5. 如需创建此环境,请点击创建

  6. 等待环境创建完成。完成后,相应环境名称旁边会显示一个绿色对勾标记。

gcloud

将 Cloud Composer Service Agent 帐号添加为环境服务帐号的新主帐号,然后向其授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。

默认情况下,您的环境使用默认 Compute Engine 服务帐号

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

创建新环境:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。 本快速入门使用的是 example-environment
  • LOCATION 替换为用于 Cloud Composer 环境的某个区域。如需了解如何选择区域,请参阅可用区域
  • IMAGE_VERSION 替换为 Cloud Composer 映像的名称。本指南使用 composer-1.20.12-airflow-1.10.15 创建包含最新 Cloud Composer 映像的环境。

示例:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

要使用 Terraform 配置此环境,请将以下资源块添加到 Terraform 配置并运行 terraform apply

如需使用此资源块,Terraform 使用的服务帐号必须具有已启用 composer.environments.create 权限的角色。如需详细了解 Terraform 的服务帐号,请参阅 Google 提供程序配置参考

如需详细了解如何使用 Terraform 创建 Cloud Composer 环境,请参阅 Terraform 文档

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME 替换为环境的名称。 本快速入门使用的是 example-environment

  • LOCATION 替换为用于 Cloud Composer 环境的某个区域。如需了解如何选择区域,请参阅可用区域

  • IMAGE_VERSION 替换为 Cloud Composer 映像的名称。本指南使用 composer-1.20.12-airflow-1.10.15 创建包含最新 Cloud Composer 映像的环境。

示例:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

查看环境详情

创建完环境后,您可以查看环境的信息,例如 Cloud Composer 版本、Airflow 网页界面的网址,以及 Cloud Storage 中的 DAGs 文件夹。

如需查看环境信息,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 如需查看环境详情页面,请点击您的环境名称 example-environment

创建 DAG

Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。

quickstart.py 中的 Python 代码执行以下操作:

  1. 创建一个 DAG composer_sample_dag。此 DAG 每天运行一次。
  2. 执行一项 print_dag_run_conf 任务。该任务使用 bash 运算符输出 DAG 运行的配置。

如需创建 DAG,请在本地机器上创建 quickstart.py 文件的副本。

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

将 DAG 上传到 Cloud Storage

Cloud Composer 只会安排位于环境的 Cloud Storage 存储桶所含 /dags 文件夹中的 DAG。

如需安排 DAG,请将 quickstart.py 从本地机器上传到您的环境的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 如需打开 /dags 文件夹,请访问 example-environmentDAGs 文件夹链接。

  3. 点击“存储分区详情”页面上的上传文件,然后选择 quickstart.py 的本地副本。

  4. 如需上传该文件,请点击打开

    上传您的 DAG 之后,Cloud Composer 会将该 DAG 添加到 Airflow 中并立即安排一个 DAG 运行。该 DAG 可能要过几分钟才能显示在 Airflow 网页界面中。

gcloud

要使用 gcloud 上传 quickstart.py,请运行以下命令:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

在 Airflow 界面中查看 DAG。

每个 Cloud Composer 环境都有一个 Web 服务器,用于运行 Airflow 网页界面。您可以通过 Airflow 网页界面管理 DAG。

如需在 Airflow 网页界面中查看 DAG,请执行以下操作:

Airflow 1

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 如需打开 Airflow 网页界面,请点击 example-environmentAirflow 链接。Airflow 界面会在新的浏览器窗口中打开。

  3. 在 Airflow 工具栏中,转到 DAG 页面。

  4. 如需打开 DAG 详情页面,请点击 composer_sample_dag

    Airflow 界面中的 DAG 页面
    图 1:Airflow 界面中的 DAG 页面(点击可放大)

    DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。

    composer_sample_dags DAG 的树状视图
    图 2:composer_sample_dags DAG 的树状视图

Airflow 2

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 如需打开 Airflow 网页界面,请点击 example-environmentAirflow 链接。Airflow 界面会在新的浏览器窗口中打开。

  3. 在 Airflow 工具栏中,转到 DAG 页面。

  4. 如需打开 DAG 详情页面,请点击 composer_sample_dag

    Airflow 界面中的 DAG 页面
    图 1:Airflow 界面中的 DAG 页面(点击可放大)

    DAG 的页面会显示树状视图,这是工作流的任务和依赖项的图形表示。

    composer_sample_dags DAG 的树状视图
    图 2:composer_sample_dags DAG 的树状视图

在 Airflow 日志中查看任务实例详情

您安排的 DAG 包含 print_dag_run_conf 任务。该任务会输出 DAG 运行的配置,您可以在任务实例的 Airflow 日志中查看这项配置。

如需查看任务实例详情,请执行以下操作:

Airflow 1

  1. 在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View

    如果将鼠标指针悬停在 print_dag_run_conf 任务上,系统会显示该任务的状态。

    composer_sample_dags DAG 的树状视图
    图 3. print_dag_run_conf 任务的状态
  2. 点击 print_dag_run_conf 任务。

    任务实例上下文菜单中,您可以获取元数据,并执行某些操作。

    composer_sample_dags 任务的任务实例上下文菜单
    图 4.composer_sample_dags 任务的任务实例上下文菜单
  3. 任务实例上下文菜单中,点击 View Log

  4. 在日志中,查找 Running: ['bash' 以查看 bash 运算符的输出。

    Bash 运算符日志输出
    图 5.Bash 运算符日志输出

Airflow 2

  1. 在 Airflow 网页界面的 DAG 树状视图中,点击 Graph View

    如果将鼠标指针悬停在 print_dag_run_conf 任务上,系统会显示该任务的状态。

    composer_sample_dags DAG 的树状视图
    图 3. print_dag_run_conf 任务的状态
  2. 点击 print_dag_run_conf 任务。

    任务实例上下文菜单中,您可以获取元数据,并执行某些操作。

    composer_sample_dags 任务的任务实例上下文菜单
    图 4.composer_sample_dags 任务的任务实例上下文菜单
  3. 任务实例上下文菜单中,点击 View Log

  4. 在日志中,查找 Running command: ['bash' 以查看 bash 运算符的输出。

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

删除本教程中使用的资源

  1. 删除 Cloud Composer 环境:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 选择 example-environment,并点击删除

    3. 等待环境删除完成。

  2. 删除环境的存储桶。删除 Cloud Composer 环境不会删除其存储桶。

    1. 在 Google Cloud 控制台中,转到存储 > 浏览器页面。

      转到“存储”>“浏览器”

    2. 选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为 us-central1-example-environ-c1616fe8-bucket

  3. 删除您的环境的 Redis 队列的永久性磁盘。删除 Cloud Composer 环境并不会删除其永久性磁盘。

    1. 在 Google Cloud 控制台中,前往 Compute Engine > 磁盘

      转到“磁盘”

    2. 选择环境的 Redis 队列永久性磁盘,然后点击删除

      例如,此磁盘可以命名为 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee。适用于 Cloud Composer 1 的磁盘始终为 Standard persistent disk 类型,大小为 2 GB。

后续步骤