Cloud Functions로 DAG 트리거

이 페이지에서는 이벤트 기반 DAG 트리거에 Cloud Functions를 사용하는 방법을 설명합니다.

Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 응답하여 DAG를 트리거할 수 있습니다. 이를 수행하는 한 가지 방법은 Cloud Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하도록 하는 것입니다. 예를 들어 Cloud Storage 버킷에서 객체가 변경되거나 메시지가 Pub/Sub 주제에 게시될 때 DAG를 트리거하는 함수를 만들 수 있습니다.

이 가이드의 예시는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.

시작하기 전에

프로젝트에 API 사용 설정

Cloud Composer and Cloud Functions API를 사용 설정합니다.

API 사용 설정

Airflow REST API 사용 설정

Airflow 버전에 따라 다음 안내를 따르세요.

Airflow 웹 서버 URL 가져오기

함수가 Airflow 웹 서버 엔드포인트를 요청하므로 Airflow 웹 서버의 URL을 가져옵니다.

Console

Airflow 웹 서버 URL을 가져오려면 다음 안내를 따르세요.

  1. 환경 페이지를 엽니다.

    환경 페이지 열기

  2. 환경 이름을 클릭합니다.
  3. 환경 구성에서 Airflow 웹 UI 항목을 참조하세요.

gcloud

Airflow 웹 서버 URL을 가져오기 위해 다음 명령어를 실행합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 Compute Engine 리전입니다.

IAM 프록시의 client_id 가져오기

Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 IAM 프록시의 클라이언트 ID가 필요합니다.

Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 다음 Cloud Shell 또는 로컬 환경에서 코드를 실행합니다.

python get_client_id.py

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

Cloud Storage 버킷 만들기

이 예시에서 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거하므로 이 예시에 사용할 새 버킷을 만듭니다.

Cloud Functions에서 DAG 트리거

환경에 DAG 업로드

환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.

import datetime

import airflow
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        start_date=datetime.datetime(2021, 1, 1),
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

DAG를 트리거하는 Cloud 함수 배포

다음 구성 매개변수 및 콘텐츠를 사용하여 Python Cloud 함수를 배포합니다.

Cloud 함수 구성 매개변수 지정

  • 트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.

    • 트리거 유형. Cloud Storage

    • 이벤트 유형. 완료/생성.

    • 버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.

    • 실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.

  • 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.

    • Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.

    • Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.

  • 런타임 및 시작점. 이 예시의 코드를 추가할 때 Python 3.7 또는 더 높은 런타임을 선택하고 trigger_dag를 시작점으로 지정합니다.

요구사항 추가

requirements.txt 파일에 종속 항목을 지정합니다.

requests_toolbelt==0.9.1
google-auth==2.0.0

함수 코드 추가

다음 코드를 main.py 파일에 넣고 다음과 같이 바꿉니다.

  • client_id 변수의 값을 이전 단계에서 가져온 client_id 값으로 바꿉니다.
  • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전 단계에서 Airflow 웹 인터페이스 URL을 가져왔습니다.
  • 사용할 Airflow REST API 버전을 지정합니다.

    • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
    • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

함수 테스트

함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.

  1. 함수가 배포될 때까지 기다립니다.
  2. 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud Console에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
  3. Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
  4. Airflow 웹 인터페이스에서 이 실행에 대한 태스크 로그를 확인합니다. print_gcs_info 태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0

다음 단계