使用 Cloud Run 函式和 Airflow REST API 觸發 Cloud Composer DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 Cloud Run 函式,根據事件觸發 Cloud Composer DAG。

Apache Airflow 的設計是定期執行 DAG,但您也可以在發生事件時觸發 DAG。其中一種做法是使用 Cloud Run 函式,在發生特定事件時觸發 Cloud Composer DAG。

本指南中的範例程式會在 Cloud Storage 值區每次有所異動時執行 DAG。值區中任何物件的變更都會觸發函式。這項函式會向 Cloud Composer 環境的 Airflow REST API 發出要求。Airflow 會處理這項要求並執行 DAG。DAG 會輸出變更相關資訊。

事前準備

檢查環境的網路設定

這個解決方案不適用於私人 IP 和 VPC Service Controls 設定,因為在這些設定中,無法設定從 Cloud Run 函式到 Airflow 網頁伺服器的連線。

在 Cloud Composer 2 中,您可以使用其他方法: 使用 Cloud Run 函式和 Pub/Sub 訊息觸發 DAG

為專案啟用 API

主控台

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

啟用 Airflow REST API

視 Airflow 版本而定:

使用 Webserver Access Control 允許對 Airflow REST API 進行 API 呼叫

Cloud Run 函式可以使用 IPv4 或 IPv6 位址連線至 Airflow REST API。

如果不確定呼叫 IP 範圍,請在 Webserver Access Control 中使用預設設定選項 All IP addresses have access (default),以免不慎封鎖 Cloud Run 函式。

建立 Cloud Storage 值區

這個範例會在 Cloud Storage bucket 發生變更時觸發 DAG。建立新的 bucket,以便用於這個範例。

取得 Airflow 網路伺服器網址

這個範例會向 Airflow 網頁伺服器端點發出 REST API 要求。 您會在 Cloud Functions 程式碼中使用 Airflow 網頁介面網址中 .appspot.com 前的部分。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 按一下環境名稱。

  3. 在「Environment details」(環境詳細資料) 頁面中,前往「Environment configuration」(環境設定) 分頁。

  4. Airflow 網路伺服器的網址會列在「Airflow web UI」(Airflow 網頁版 UI) 項目中。

gcloud

執行下列指令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

取得 IAM 代理程式的 client_id

如要向 Airflow REST API 端點提出要求,您必須為函式提供保護 Airflow 網路伺服器的 Identity and Access Management Proxy 用戶端 ID。

Cloud Composer 不會直接提供這項資訊,而會向 Airflow 網路伺服器發出未經授權的要求,並從重新導向網址中擷取用戶端 ID:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

請將 AIRFLOW_URL 改成 Airflow 網頁介面的網址。

在輸出內容中,搜尋 client_id 後方的字串。例如:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

將下列程式碼儲存為 get_client_id.py 檔案。填入 project_idlocationcomposer_environment 的值,然後在 Cloud Shell 或本機環境中執行程式碼。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

將 DAG 上傳至環境

將 DAG 上傳至環境。下列 DAG 範例會輸出收到的 DAG 執行設定。您會從函式觸發這個 DAG,該函式會在稍後建立。

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

部署會觸發 DAG 的 Cloud 函式

您可以使用 Cloud Run 函式或 Cloud Run 支援的慣用語言,部署 Cloud Functions。本教學課程將示範以 PythonJava 實作的 Cloud 函式。

指定 Cloud Functions 設定參數

  • 觸發條件。以這個範例來說,請選取在值區中建立新物件,或覆寫現有物件時觸發的觸發條件。

    • 觸發條件類型。都能透過多種方式 將資料傳入 Cloud Storage

    • 事件類型完成 / 建立

    • 值區。選取必須觸發這項函式的 bucket。

    • 失敗時重新執行。建議您停用這個選項,以利進行範例操作。如果您在實際工作環境中使用自己的函式,請啟用這個選項來處理暫時性錯誤

  • 「執行階段、建構作業、連線和安全性設定」專區中的「執行階段服務帳戶」。根據偏好設定,使用下列其中一種方式:

    • 選取「Compute Engine 預設服務帳戶」。根據預設 IAM 權限,這個帳戶可以執行存取 Cloud Composer 環境的函式。

    • 建立自訂服務帳戶,並指派 Composer 使用者角色,然後將該帳戶指定為這個函式的執行階段服務帳戶。這個選項遵循最低權限原則。

  • 執行階段和進入點,位於「程式碼」步驟。為這個範例新增程式碼時,請選取 Python 3.7 以上版本做為執行階段,並指定 trigger_dag 做為進入點。

新增要求

requirements.txt 檔案中指定依附元件:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

將下列程式碼放入 main.py 檔案,並進行下列替換:

  • client_id 變數的值替換為您先前取得的 client_id 值。

  • webserver_id 變數的值替換為您的租戶專案 ID,這是 Airflow 網頁介面網址中 .appspot.com 前的部分。您先前已取得 Airflow 網頁介面網址。

  • 指定您使用的 Airflow REST API 版本:

    • 如果您使用穩定版 Airflow REST API,請將 USE_EXPERIMENTAL_API 變數設為 False
    • 如果您使用實驗性 Airflow REST API,則無需進行任何變更。USE_EXPERIMENTAL_API 變數已設為 True


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

測試函式

如要確認函式和 DAG 是否正常運作,請按照下列步驟操作:

  1. 等待函式部署完成。
  2. 請上傳一個檔案至 Cloud Storage 值區。或者,您也可以在 Google Cloud 主控台中選取函式的「測試函式」動作,手動觸發函式。
  3. 在 Airflow 網頁介面中查看 DAG 頁面。DAG 應有一個有效或已完成的 DAG 執行作業。
  4. 在 Airflow UI 中,查看這項執行的工作記錄。您應該會看到 print_gcs_info 工作將從函式收到的資料輸出至記錄檔:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

後續步驟