Cloud Functions 및 Airflow REST API로 Cloud Composer DAG 트리거

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Functions를 사용하여 이벤트에 대한 응답으로 Cloud Composer DAG를 트리거하는 방법을 설명합니다.

Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다.

이 가이드의 예시는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.

시작하기 전에

환경의 네트워킹 구성 확인

비공개 IP 및 VPC 서비스 제어 구성에서는 Cloud Functions에서 Airflow 웹 서버로의 연결을 구성할 수 없으므로 해당 구성에서는 이 솔루션이 작동하지 않습니다.

Cloud Composer 2에서 또 다른 접근 방법인 Cloud Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거를 사용할 수 있습니다.

프로젝트에 API 사용 설정

콘솔

Enable the Cloud Composer and Cloud Functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Airflow REST API 사용 설정

Airflow 버전에 따라 다음 안내를 따르세요.

웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용

Cloud Functions는 IPv4 또는 IPv6 주소를 사용하여 Airflow REST API에 연결할 수 있습니다.

호출 IP 범위를 모르는 경우 웹 서버 액세스 제어의 기본 구성 옵션인 All IP addresses have access (default)를 사용하여 Cloud Functions를 실수로 차단하지 않도록 합니다.

Cloud Storage 버킷 만들기

이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.

Airflow 웹 서버 URL 가져오기

이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다. Cloud 함수 코드에서 .appspot.com 앞에 Airflow 웹 인터페이스 URL의 일부를 사용합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 이름을 클릭합니다.

  3. 환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.

  4. Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.

gcloud

다음 명령어를 실행합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

IAM 프록시의 client_id 가져오기

Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 Identity and Access Management 프록시의 클라이언트 ID가 필요합니다.

Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

환경에 DAG 업로드

환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.

import datetime

import airflow
from airflow.operators.bash import BashOperator

with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

DAG를 트리거하는 Cloud 함수 배포

Cloud Functions 또는 Cloud Run에서 지원되는 선호 언어를 사용하여 Cloud 함수를 배포할 수 있습니다. 이 튜토리얼에서는 PythonJava로 구현된 Cloud 함수를 보여줍니다.

Cloud 함수 구성 매개변수 지정

  • 트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.

    • 트리거 유형. Cloud Storage.

    • 이벤트 유형. 완료/생성.

    • 버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.

    • 실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.

  • 런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.

    • Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.

    • Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.

  • 코드 단계의 런타임 및 진입점. 이 예시의 코드를 추가할 때 Python 3.7 또는 더 최근의 런타임을 선택하고 trigger_dag를 시작점으로 지정합니다.

요구사항 추가

requirements.txt 파일에 종속 항목을 지정합니다.

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

다음 코드를 main.py 파일에 넣고 다음과 같이 바꿉니다.

  • client_id 변수 값을 이전에 가져온 client_id 값으로 바꿉니다.

  • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전에 Airflow 웹 인터페이스 URL을 가져왔습니다.

  • 사용할 Airflow REST API 버전을 지정합니다.

    • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
    • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text

# END COPIED IAP CODE

함수 테스트

함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.

  1. 함수가 배포될 때까지 기다립니다.
  2. 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud 콘솔에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
  3. Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
  4. Airflow UI에서 이 실행의 작업 로그를 확인합니다. print_gcs_info 태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

다음 단계