Airflow REST API 액세스

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow에는 DAG 실행 및 작업 정보 가져오기, DAG 업데이트, Airflow 구성 가져오기, 연결 추가 및 삭제, 사용자 나열과 같은 작업을 수행하는 데 사용할 수 있는 REST API 인터페이스가 있습니다.

Cloud Run 함수와 함께 Airflow REST API를 사용하는 예시는 Cloud Run 함수로 DAG 트리거를 참고하세요.

Airflow REST API 버전

  • Airflow 2는 안정적인 REST API를 사용합니다.
  • 실험용 REST API는 Airflow에서 지원 중단되었습니다.

안정적인 Airflow REST API 구성

안정적인 REST API는 Airflow 2에서 기본적으로 사용 설정되어 있습니다. Cloud Composer는 자체 API 인증 백엔드를 사용합니다.

승인은 Airflow에서 제공하는 표준 방법으로 작동합니다. 새 사용자가 API를 통해 승인하면 기본적으로 사용자 계정에 Op 역할이 부여됩니다.

안정적인 REST API를 사용 설정 또는 중지하거나 다음 Airflow 구성 옵션을 재정의하여 기본 사용자 역할을 변경할 수 있습니다.

섹션 참고
api (Airflow 2.2.5 이하) auth_backend
(Airflow 2.3.0 이상) auth_backends
airflow.composer.api.backend.composer_auth 안정적인 REST API를 사용 중지하려면 airflow.api.auth.backend.deny_all로 변경합니다.
api composer_auth_user_registration_role Op 다른 역할을 지정할 수 있습니다.

웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용

Airflow REST API를 호출하는 데 사용되는 메서드에 따라 호출자 메서드는 IPv4 또는 IPv6 주소를 사용할 수 있습니다. 웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 IP 트래픽을 차단 해제해야 합니다.

Airflow REST API에 대한 호출을 전송할 IP 주소를 모르는 경우 기본 구성 옵션 All IP addresses have access (default)를 사용합니다.

Airflow REST API 호출

이 섹션에서는 안정적인 Airflow REST API로 DAG를 트리거하는 데 사용할 수 있는 Python 스크립트의 예시를 제공합니다.

다음 예시의 콘텐츠를 composer2_airflow_rest_api.py라는 파일에 삽입한 다음 다음 변수를 설정합니다.

  • dag_id: DAG 소스 파일에 정의된 DAG의 이름입니다.
  • dag_config: DAG 실행의 구성입니다.
  • web_server_url: Airflow 웹 서버 URL입니다. 형식은 https://<web-server-id>.composer.googleusercontent.com입니다.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

서비스 계정을 사용하여 Airflow REST API에 액세스

Airflow 데이터베이스는 이메일 필드 길이를 64자(영문 기준)로 제한합니다. 서비스 계정에 64자보다 긴 이메일 주소가 있는 경우가 있습니다. 일반적인 방법으로는 이러한 서비스 계정의 Airflow 사용자를 만들 수 없습니다. 이러한 서비스 계정에 대한 Airflow 없을 때 Airflow REST API에 액세스하면 HTTP 오류 401 및 403이 발생합니다.

이 문제를 해결하려면 서비스 계정의 Airflow 사용자를 사전 등록하면 됩니다. 이렇게 하려면 accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로, 모든 고유 문자열을 이메일로 사용하세요.

  1. 서비스 계정의 NUMERIC_USER_ID를 가져오려면 다음을 실행합니다.

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    다음과 같이 바꿉니다.

    • SA_NAME를 서비스 계정 이름으로 바꿉니다.
    • PROJECT_ID프로젝트 ID로 바꿉니다.
  2. 서비스 계정에 대한 Op 역할이 있는 Airflow 사용자를 만듭니다.

    Airflow UI

    1. Airflow UI로 이동합니다.

    2. 보안 > 사용자 나열로 이동하고 새 레코드 추가를 클릭합니다. 이 페이지를 열려면 Airflow 사용자에게 Admin 역할이 있어야 합니다.

    3. accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로 지정합니다. NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.

    4. 고유 식별자를 이메일로 지정합니다. 모든 고유한 문자열을 사용할 수 있습니다.

    5. 사용자 역할을 지정합니다. 예를 들면 Op입니다.

    6. 활성 여부 체크박스가 선택되어 있는지 확인합니다.

    7. 사용자의 성과 이름을 지정합니다. 모든 문자열을 사용할 수 있습니다.

    8. 저장을 클릭합니다.

    gcloud

    다음 Airflow CLI 명령어를 실행합니다.

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    다음과 같이 바꿉니다.

    • ENVIRONMENT_NAME: 환경 이름
    • LOCATION을 환경이 위치한 리전으로 바꿉니다.
    • NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.
    • UNIQUE_ID를 Airflow 사용자의 식별자로 바꿉니다. 모든 고유한 문자열을 사용할 수 있습니다.
  3. 서비스 계정의 Airflow 사용자를 만들면 서비스 계정으로 인증된 호출자가 사전 등록된 사용자로 인식되고 Airflow에 로깅됩니다.

Airflow REST API 구성요소 확장

Airflow REST API 및 Airflow UI 엔드포인트는 Airflow 웹 서버 내에서 실행됩니다. REST API를 집중적으로 사용할 경우에는 예상 로드에 따라 Airflow 웹 서버에서 사용할 수 있는 CPU 및 메모리 양을 늘리는 것이 좋습니다.

다음 단계