Airflow REST API 액세스

Cloud Composer 1 | Cloud Composer 2

Apache Airflow에는 DAG 실행 및 작업 정보 가져오기, DAG 업데이트, Airflow 구성 가져오기, 연결 추가 및 삭제, 사용자 나열과 같은 작업을 수행하는 데 사용할 수 있는 REST API 인터페이스가 있습니다.

Cloud Functions로 Airflow REST API를 사용하는 예시는 Cloud Functions로 DAG 트리거를 참조하세요.

Airflow REST API 버전

Cloud Composer 1에서 사용할 수 있는 Airflow REST API 버전은 다음과 같습니다.

  • Airflow 2는 안정적인 REST API를 사용합니다. 실험용 REST API는 Airflow에서 지원 중단되었습니다.

  • 추가로 설명된 것처럼 Airflow 구성 재정의를 통해 사용 설정한 경우 Airflow 2에서 실험용 REST API를 계속 사용할 수 있습니다.

시작하기 전에

Cloud Composer API 사용 설정

API 사용 설정

안정적인 Airflow REST API 사용 설정

Airflow 2

Airflow 2에서는 안정적인 REST API가 이미 기본적으로 사용 설정되어 있습니다.

Cloud Composer는 IAP(Identity-Aware Proxy)와 통합된 자체 API 인증 백엔드를 사용합니다.

승인은 Airflow에서 제공하는 표준 방법으로 작동합니다. 새 사용자가 API를 통해 승인하면 기본적으로 사용자 계정에 Op 역할이 부여됩니다.

안정적인 REST API를 사용 설정 또는 중지하거나 다음 Airflow 구성 옵션을 재정의하여 기본 사용자 역할을 변경할 수 있습니다.

섹션 참고
api auth_backend airflow.composer.api.backend.composer_auth 안정적인 REST API를 사용 중지하려면 airflow.api.auth.backend.deny_all로 변경합니다.
api composer_auth_user_registration_role Op 다른 역할을 지정할 수 있습니다.

Airflow 1

Airflow 1에서는 안정적인 REST API를 사용할 수 없습니다. 대신 실험용 REST API를 사용할 수 있습니다.

실험용 Airflow REST API 사용 설정

Airflow 2

API 인증 기능은 실험용 API에서 기본적으로 사용 중지됩니다. Airflow 웹 서버에서는 수행하는 모든 요청을 거부합니다.

API 인증 기능과 Airflow 2 실험용 API를 사용 설정하려면 다음 Airflow 구성 옵션을 재정의합니다.

섹션 참고
api auth_backend airflow.api.auth.backend.default 기본값은 airflow.composer.api.backend.composer_auth입니다.
api enable_experimental_api True 기본값은 False입니다.

Airflow 1

기본적으로 API 인증 기능은 Airflow 1.10.11 이상 버전에서는 사용 중지됩니다. Airflow 웹 서버에서는 수행하는 모든 요청을 거부합니다. 요청을 사용하여 DAG를 트리거하므로 이 기능을 사용 설정합니다.

Airflow 1에서 API 인증 기능을 사용 설정하려면 다음 Airflow 구성 옵션을 재정의합니다.

섹션 참고
api auth_backend airflow.api.auth.backend.default 기본값은 airflow.api.auth.backend.deny_all입니다.

api-auth_backend 구성 옵션을 airflow.api.auth.backend.default로 설정하면 Airflow 웹 서버가 인증 없이 모든 API 요청을 수락합니다. Airflow 웹 서버 자체에는 인증이 필요하지 않지만 여전히 자체 인증 레이어를 제공하는 IAP(Identity-Aware Proxy)로 보호됩니다.

Airflow REST API 호출

IAM 프록시의 client_id 가져오기

Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 IAM 프록시의 클라이언트 ID가 필요합니다.

Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = ("This script is intended to be used with Composer 1 environments. "
                     "In Composer 2, the Airflow Webserver is not in the tenant project, "
                     "so there is no tenant client ID. "
                     "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details.")
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

client_id를 사용하여 Airflow REST API 호출

다음을 바꿉니다.

  • client_id 변수의 값을 이전 단계에서 얻은 client_id 값으로 바꿉니다.
  • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전 단계에서 Airflow 웹 인터페이스 URL을 가져왔습니다.
  • 사용할 Airflow REST API 버전을 지정합니다.

    • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
    • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

서비스 계정을 사용하여 Airflow REST API에 액세스

Airflow 데이터베이스는 이메일 필드 길이를 64자(영문 기준)로 제한합니다. 서비스 계정에 64자보다 긴 이메일 주소가 있는 경우가 있습니다. 일반적인 방법으로는 이러한 서비스 계정의 Airflow 사용자를 만들 수 없습니다.

이 문제를 해결하려면 서비스 계정의 Airflow 사용자를 사전 등록하면 됩니다. 이렇게 하려면 accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로, 모든 고유 문자열을 이메일로 사용하세요.

  1. 서비스 계정의 NUMERIC_USER_ID를 가져오려면 다음을 실행합니다.

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    다음과 같이 바꿉니다.

    • SA_NAME를 서비스 계정 이름으로 바꿉니다.
    • PROJECT_ID프로젝트 ID로 바꿉니다.
  2. 서비스 계정에 대한 Op 역할이 있는 Airflow 사용자를 만듭니다.

    Airflow UI

    1. Airflow UI로 이동합니다. Airflow 사용자에게 Admin 역할이 있어야 합니다.

    2. 관리 > 사용자로 이동하고 만들기를 클릭합니다.

    3. accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로 지정합니다. NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.

    4. 고유 식별자를 이메일로 지정합니다. 모든 고유한 문자열을 사용할 수 있습니다.

    5. 사용자 역할을 지정합니다. Op).

    6. 사용자의 성과 이름을 지정합니다. 모든 문자열을 사용할 수 있습니다.

    7. 저장을 클릭합니다.

    gcloud

    Airflow 2에서 다음 Airflow CLI 명령어를 실행합니다.

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    다음과 같이 바꿉니다.

    • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
    • LOCATION을 환경이 위치한 리전으로 바꿉니다.
    • NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.
    • UNIQUE_ID를 Airflow 사용자의 식별자로 바꿉니다. 모든 고유한 문자열을 사용할 수 있습니다.
  3. 서비스 계정의 Airflow 사용자를 만들면 서비스 계정으로 인증된 호출자가 사전 등록된 사용자로 인식되고 Airflow에 로깅됩니다.

다음 단계