使用 Cloud Functions 触发 DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Cloud Functions 触发 DAG 来响应事件。

Airflow 设计为定期运行 DAG,但您也可以触发 DAG 来响应事件。方法之一是使用 Cloud Functions 在发生指定事件时触发 Cloud Composer DAG。例如,您可以创建一个函数,当 Cloud Storage 存储桶中的对象发生更改时或者当消息推送到 Pub/Sub 主题时触发 DAG。

在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。

准备工作

为您的项目启用 API

控制台

  • 启用 Cloud Composer and Cloud Functions API。

    启用 API

  • gcloud

  • 启用 Cloud Composer and Cloud Functions API。
    gcloud services enable cloudfunctionscomposer
  • 启用 Airflow REST API

    对于 Airflow 2,稳定的 REST API 默认处于启用状态。如果您的环境停用了稳定的 API,则启用稳定的 REST API

    创建 Cloud Storage 存储分区

    此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG。请创建一个新存储桶以用于此示例。

    获取 Airflow 网络服务器网址

    此示例向 Airflow 网络服务器端点发出 REST API 请求。您可以在 Cloud Functions 函数代码中使用 Airflow 网络服务器的网址。

    控制台

    1. 在 Google Cloud Console 中,转到环境页面。

      转到“环境”

    2. 点击您的环境的名称。

    3. 环境详情页面上,转到环境详情标签页。

    4. Airflow 网页界面项中列出了 Airflow 网络服务器的网址。

    gcloud

    运行以下命令:

    gcloud composer environments describe ENVIRONMENT_NAME \
        --location LOCATION \
        --format='value(config.airflowUri)'
    

    替换:

    • ENVIRONMENT_NAME 替换为环境名称。
    • LOCATION 替换为环境所在的 Compute Engine 区域。

    通过 Cloud Functions 函数触发 DAG

    将 DAG 上传到您的环境

    将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。

    import datetime
    
    import airflow
    from airflow.operators.bash_operator import BashOperator
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            start_date=datetime.datetime(2021, 1, 1),
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    部署触发 DAG 的 Cloud Functions 函数

    使用以下配置参数和内容部署 Python Cloud Functions 函数

    指定 Cloud Functions 函数配置参数

    • 触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。

      • 触发器类型。Cloud Storage。

      • 事件类型完成创建/创建

      • 存储桶。选择必须触发此函数的存储桶。

      • 失败时重试。在本示例中,我们建议停用此选项。如果您在生产环境中使用自己的函数,请启用此选项以处理暂时性错误

    • 运行时服务帐号,在运行时、构建、连接和安全设置部分中。根据您的偏好使用下列选项之一:

      • 选择 Compute Engine 默认服务帐号。此帐号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。

      • 创建自定义服务帐号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务帐号。此选项遵循最小权限原则。

    • 运行时和入口点,在代码步骤中。为此示例添加代码时,请选择 Python 3.7 或更高版本的运行时,并将 trigger_dag_gcf 指定为入口点。

    添加要求

    requirements.txt 文件中指定依赖项:

    google-auth==2.3.3
    requests==2.26.0
    

    添加使用 Airflow REST API 触发 DAG 的代码

    创建一个名为 composer2_airflow_rest_api.py 的文件,并将进行 Airflow REST API 调用的代码放在此文件中。

    请勿更改任何变量。Cloud Functions 函数会从 main.py 文件导入此文件。

    from typing import Any
    
    import google.auth
    from google.auth.transport.requests import AuthorizedSession
    import requests
    
    # Following GCP best practices, these credentials should be
    # constructed at start-up time and used throughout
    # https://cloud.google.com/apis/docs/client-libraries-best-practices
    AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
    CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
    
    def make_composer2_web_server_request(url: str, method: str = "GET", **kwargs: Any) -> google.auth.transport.Response:
        """
        Make a request to Cloud Composer 2 environment's web server.
        Args:
          url: The URL to fetch.
          method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
            'PATCH', 'DELETE')
          **kwargs: Any of the parameters defined for the request function:
                    https://github.com/requests/requests/blob/master/requests/api.py
                      If no timeout is provided, it is set to 90 by default.
        """
    
        authed_session = AuthorizedSession(CREDENTIALS)
    
        # Set the default timeout, if missing
        if "timeout" not in kwargs:
            kwargs["timeout"] = 90
    
        return authed_session.request(method, url, **kwargs)
    
    def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
        """
        Make a request to trigger a dag using the stable Airflow 2 REST API.
        https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
    
        Args:
          web_server_url: The URL of the Airflow 2 web server.
          dag_id: The DAG ID.
          data: Additional configuration parameters for the DAG run (json).
        """
    
        endpoint = f"api/v1/dags/{dag_id}/dagRuns"
        request_url = f"{web_server_url}/{endpoint}"
        json_data = {"conf": data}
    
        response = make_composer2_web_server_request(
            request_url, method="POST", json=json_data
        )
    
        if response.status_code == 403:
            raise requests.HTTPError(
                "You do not have a permission to perform this operation. "
                "Check Airflow RBAC roles for your account."
                f"{response.headers} / {response.text}"
            )
        elif response.status_code != 200:
            response.raise_for_status()
        else:
            return response.text

    添加 Cloud Functions 函数代码

    将以下代码放入 main.py 文件中。将 web_server_url 变量的值替换为您之前获得的 Airflow 网络服务器地址。

    # Copyright 2021 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    Trigger a DAG in a Cloud Composer 2 environment in response to an event,
    using Cloud Functions.
    """
    
    from typing import Any
    
    import composer2_airflow_rest_api
    
    def trigger_dag_gcf(data, context=None):
        """
        Trigger a DAG and pass event data.
    
        Args:
          data: A dictionary containing the data for the event. Its format depends
          on the event.
          context: The context object for the event.
    
        For more information about the arguments, see:
        https://cloud.google.com/functions/docs/writing/background#function_parameters
        """
    
        # TODO(developer): replace with your values
        # Replace web_server_url with the Airflow web server address. To obtain this
        # URL, run the following command for your environment:
        # gcloud composer environments describe example-environment \
        #  --location=your-composer-region \
        #  --format="value(config.airflowUri)"
        web_server_url = (
            "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
        )
        # Replace with the ID of the DAG that you want to run.
        dag_id = 'composer_sample_trigger_response_dag'
    
        composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
    

    测试函数

    要检查函数和 DAG 是否按预期工作,请执行以下操作:

    1. 等待您的函数完成部署。
    2. 向您的 Cloud Storage 存储分区上传一个文件。除此之外,您也可以手动触发函数,方法是在 Google Cloud Console 中为该函数选择测试函数操作。
    3. 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
    4. 在 Airflow 界面中,检查此运行的任务日志。您应该会看到 print_gcs_info 任务将从函数接收的数据输出到日志中:
    [2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
    [2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
        {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
        crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
        ... }
    [2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
        return code 0h
    

    后续步骤