使用 Cloud Functions 触发 DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Cloud Functions 触发 DAG 来响应事件。

Airflow 设计为定期运行 DAG,但您也可以触发 DAG 来响应事件。方法之一是使用 Cloud Functions 在发生指定事件时触发 Cloud Composer DAG。例如,您可以创建一个函数,当 Cloud Storage 存储桶中的对象发生更改时或者当消息推送到 Pub/Sub 主题时触发 DAG。

在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对存储桶中的任何对象所做的任何更改都会触发一个函数。此函数向您的 Cloud Composer 环境的 Airflow REST API 发出请求。Airflow 会处理此请求并运行一个 DAG。DAG 会输出更改的相关信息。

准备工作

为您的项目启用 API

控制台

  • 启用 Cloud Composer and Cloud Functions API。

    启用 API

  • gcloud

  • 启用 Cloud Composer and Cloud Functions API。

    gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
  • 启用 Airflow REST API

    根据您的 Airflow 版本,执行以下操作:

    创建 Cloud Storage 存储桶

    此示例会在 Cloud Storage 存储桶中发生更改时触发 DAG。请创建一个新存储桶以用于此示例。

    获取 Airflow 网络服务器网址

    此示例向 Airflow 网络服务器端点发出 REST API 请求。您需要在 Cloud Functions 函数代码中使用 .appspot.com 之前的 Airflow 网页界面网址部分。

    控制台

    1. 在 Google Cloud Console 中,转到环境页面。

      转到“环境”

    2. 点击您的环境的名称。

    3. 环境详情页面上,转到环境详情标签页。

    4. Airflow 网页界面项中列出了 Airflow 网络服务器的网址。

    gcloud

    运行以下命令:

    gcloud composer environments describe ENVIRONMENT_NAME \
        --location LOCATION \
        --format='value(config.airflowUri)'
    

    替换:

    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。

    获取 IAM 代理的 client_id

    要向 Airflow REST API 端点发出请求,函数需要获得保护 Airflow 网络服务器的 IAM 代理的客户端 ID。

    Cloud Composer 不直接提供此信息,而是向 Airflow 网络服务器发出未经身份验证的请求,并从重定向网址捕获此客户端 ID。

    cURL

    curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
    

    AIRFLOW_URL 替换为 Airflow 网页界面的网址。

    在输出中,搜索 client_id 后面的字符串。例如:

    client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
    

    Python

    将以下代码保存在名为 get_client_id.py 的文件中。填写 project_idlocationcomposer_environment 的值,然后在 Cloud Shell 或本地环境中运行代码。

    # This script is intended to be used with Composer 1 environments
    # In Composer 2, the Airflow Webserver is not in the tenant project
    # so there is no tenant client ID
    # See https://cloud.google.com/composer/docs/composer-2/environment-architecture
    # for more details
    import google.auth
    import google.auth.transport.requests
    import requests
    import six.moves.urllib.parse
    
    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
    
    # project_id = 'YOUR_PROJECT_ID'
    # location = 'us-central1'
    # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'
    
    environment_url = (
        "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
        "/environments/{}"
    ).format(project_id, location, composer_environment)
    composer_response = authed_session.request("GET", environment_url)
    environment_data = composer_response.json()
    composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
    if "composer-1" not in composer_version:
        version_error = ("This script is intended to be used with Composer 1 environments. "
                         "In Composer 2, the Airflow Webserver is not in the tenant project, "
                         "so there is no tenant client ID. "
                         "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details.")
        raise (RuntimeError(version_error))
    airflow_uri = environment_data["config"]["airflowUri"]
    
    # The Composer environment response does not include the IAP client ID.
    # Make a second, unauthenticated HTTP request to the web server to get the
    # redirect URI.
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers["location"]
    
    # Extract the client_id query parameter from the redirect.
    parsed = six.moves.urllib.parse.urlparse(redirect_location)
    query_string = six.moves.urllib.parse.parse_qs(parsed.query)
    print(query_string["client_id"][0])

    通过 Cloud Functions 函数触发 DAG

    将 DAG 上传到您的环境

    将 DAG 上传到您的环境。以下示例 DAG 输出收到的 DAG 运行配置。您将通过本指南稍后创建的函数触发此 DAG。

    import datetime
    
    import airflow
    from airflow.operators.bash_operator import BashOperator
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            start_date=datetime.datetime(2021, 1, 1),
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    部署触发 DAG 的 Cloud Functions 函数

    使用以下配置参数和内容部署 Python Cloud Functions 函数

    指定 Cloud Functions 函数配置参数

    • 触发器。在本示例中,请选择在存储桶中创建新对象或现有对象被覆盖时触发的触发器。

      • 触发器类型。Cloud Storage。

      • 事件类型完成创建/创建

      • 存储桶。选择必须触发此函数的存储桶。

      • 失败时重试。在本示例中,我们建议停用此选项。如果您在生产环境中使用自己的函数,请启用此选项以处理暂时性错误

    • 运行时服务帐号,在运行时、构建、连接和安全设置部分中。根据您的偏好使用下列选项之一:

      • 选择 Compute Engine 默认服务帐号。此帐号使用默认 IAM 权限,可以运行访问 Cloud Composer 环境的函数。

      • 创建自定义服务帐号,使其具有 Composer User 角色,并将其指定为此函数的运行时服务帐号。此选项遵循最小权限原则。

    • 运行时和入口点,在代码步骤中。为此示例添加代码时,请选择 Python 3.7 或更高版本的运行时,并将 trigger_dag 指定为入口点。

    添加要求

    requirements.txt 文件中指定依赖项:

    requests_toolbelt==0.9.1
    google-auth==2.6.2
    

    添加 Cloud Functions 函数代码

    将以下代码放入 main.py 文件并进行以下替换:

    • client_id 变量的值替换为您之前获得的 client_id 值。

    • webserver_id 变量的值替换为您的租户项目 ID,它是 Airflow 网页界面网址中 .appspot.com 前面的部分。您之前已获得 Airflow 网页界面网址。

    • 指定您使用的 Airflow REST API 版本:

      • 如果您使用稳定的 Airflow REST API,请将 USE_EXPERIMENTAL_API 变量设置为 False
      • 如果您使用实验性 Airflow REST API,则无需进行任何更改。USE_EXPERIMENTAL_API 变量已设置为 True
    
    from google.auth.transport.requests import Request
    from google.oauth2 import id_token
    import requests
    
    IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
    OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
    # If you are using the stable API, set this value to False
    # For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
    USE_EXPERIMENTAL_API = True
    
    def trigger_dag(data, context=None):
        """Makes a POST request to the Composer DAG Trigger API
    
        When called via Google Cloud Functions (GCF),
        data and context are Background function parameters.
    
        For more info, refer to
        https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python
    
        To call this function from a Python script, omit the ``context`` argument
        and pass in a non-null value for the ``data`` argument.
    
        This function is currently only compatible with Composer v1 environments.
        """
    
        # Fill in with your Composer info here
        # Navigate to your webserver's login page and get this from the URL
        # Or use the script found at
        # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
        client_id = 'YOUR-CLIENT-ID'
        # This should be part of your webserver's URL:
        # {tenant-project-id}.appspot.com
        webserver_id = 'YOUR-TENANT-PROJECT'
        # The name of the DAG you wish to trigger
        dag_name = 'composer_sample_trigger_response_dag'
    
        if USE_EXPERIMENTAL_API:
            endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
            json_data = {'conf': data, 'replace_microseconds': 'false'}
        else:
            endpoint = f'api/v1/dags/{dag_name}/dagRuns'
            json_data = {'conf': data}
        webserver_url = (
            'https://'
            + webserver_id
            + '.appspot.com/'
            + endpoint
        )
        # Make a POST request to IAP which then Triggers the DAG
        make_iap_request(
            webserver_url, client_id, method='POST', json=json_data)
    
    # This code is copied from
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
    # START COPIED IAP CODE
    def make_iap_request(url, client_id, method='GET', **kwargs):
        """Makes a request to an application protected by Identity-Aware Proxy.
        Args:
          url: The Identity-Aware Proxy-protected URL to fetch.
          client_id: The client ID used by Identity-Aware Proxy.
          method: The request method to use
                  ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
          **kwargs: Any of the parameters defined for the request function:
                    https://github.com/requests/requests/blob/master/requests/api.py
                    If no timeout is provided, it is set to 90 by default.
        Returns:
          The page body, or raises an exception if the page couldn't be retrieved.
        """
        # Set the default timeout, if missing
        if 'timeout' not in kwargs:
            kwargs['timeout'] = 90
    
        # Obtain an OpenID Connect (OIDC) token from metadata server or using service
        # account.
        google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
    
        # Fetch the Identity-Aware Proxy-protected URL, including an
        # Authorization header containing "Bearer " followed by a
        # Google-issued OpenID Connect token for the service account.
        resp = requests.request(
            method, url,
            headers={'Authorization': 'Bearer {}'.format(
                google_open_id_connect_token)}, **kwargs)
        if resp.status_code == 403:
            raise Exception('Service account does not have permission to '
                            'access the IAP-protected application.')
        elif resp.status_code != 200:
            raise Exception(
                'Bad response from application: {!r} / {!r} / {!r}'.format(
                    resp.status_code, resp.headers, resp.text))
        else:
            return resp.text
    # END COPIED IAP CODE
    

    测试函数

    要检查函数和 DAG 是否按预期工作,请执行以下操作:

    1. 等待您的函数完成部署。
    2. 向您的 Cloud Storage 存储分区上传一个文件。作为替代方案,您可以在 Google Cloud Console 中为函数选择测试函数操作来手动触发函数。
    3. 查看 Airflow 网页界面中的 DAG 页面。DAG 应具有一个处于活动状态或已完成的 DAG 运行。
    4. 在 Airflow 界面中,检查此运行的任务日志。您应该会看到 print_gcs_info 任务将从函数接收的数据输出到日志中:
    [2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
    [2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
        {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
        crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
        ... }
    [2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
        return code 0h
    

    后续步骤