Cloud Functions로 DAG 트리거

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Functions를 사용하여 이벤트에 대한 응답으로 DAG를 트리거하는 방법을 설명합니다.

Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다. 예를 들어 Cloud Storage 버킷에서 객체가 변경되거나 메시지가 Pub/Sub 주제에 게시될 때 DAG를 트리거하는 함수를 만들 수 있습니다.

이 가이드의 예는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.

시작하기 전에

프로젝트에 API 사용 설정

Console

  • API Cloud Composer and Cloud Functions 사용 설정

    API 사용 설정

  • gcloud

  • Cloud Composer and Cloud Functions API 사용 설정

    gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
  • Airflow REST API 사용 설정

    Airflow 버전에 따라 다음 안내를 따르세요.

    Cloud Storage 버킷 만들기

    이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.

    Airflow 웹 서버 URL 가져오기

    이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다. Cloud 함수 코드에서 .appspot.com 앞에 Airflow 웹 인터페이스 URL의 일부를 사용합니다.

    Console

    1. Google Cloud Console에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. 환경 이름을 클릭합니다.

    3. 환경 세부정보 페이지에서 환경 세부정보 탭으로 이동합니다.

    4. Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.

    gcloud

    다음 명령어를 실행합니다.

    gcloud composer environments describe ENVIRONMENT_NAME \
        --location LOCATION \
        --format='value(config.airflowUri)'
    

    다음과 같이 바꿉니다.

    • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
    • LOCATION을 환경이 위치한 리전으로 바꿉니다.

    IAM 프록시의 client_id 가져오기

    Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 IAM 프록시의 클라이언트 ID가 필요합니다.

    Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

    cURL

    curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
    

    AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

    출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

    client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
    

    Python

    다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.

    # This script is intended to be used with Composer 1 environments
    # In Composer 2, the Airflow Webserver is not in the tenant project
    # so there is no tenant client ID
    # See https://cloud.google.com/composer/docs/composer-2/environment-architecture
    # for more details
    import google.auth
    import google.auth.transport.requests
    import requests
    import six.moves.urllib.parse
    
    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
    
    # project_id = 'YOUR_PROJECT_ID'
    # location = 'us-central1'
    # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'
    
    environment_url = (
        "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
        "/environments/{}"
    ).format(project_id, location, composer_environment)
    composer_response = authed_session.request("GET", environment_url)
    environment_data = composer_response.json()
    composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
    if "composer-1" not in composer_version:
        version_error = ("This script is intended to be used with Composer 1 environments. "
                         "In Composer 2, the Airflow Webserver is not in the tenant project, "
                         "so there is no tenant client ID. "
                         "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details.")
        raise (RuntimeError(version_error))
    airflow_uri = environment_data["config"]["airflowUri"]
    
    # The Composer environment response does not include the IAP client ID.
    # Make a second, unauthenticated HTTP request to the web server to get the
    # redirect URI.
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers["location"]
    
    # Extract the client_id query parameter from the redirect.
    parsed = six.moves.urllib.parse.urlparse(redirect_location)
    query_string = six.moves.urllib.parse.parse_qs(parsed.query)
    print(query_string["client_id"][0])

    Cloud Functions에서 DAG 트리거

    환경에 DAG 업로드

    환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.

    import datetime
    
    import airflow
    from airflow.operators.bash_operator import BashOperator
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            start_date=datetime.datetime(2021, 1, 1),
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    DAG를 트리거하는 Cloud 함수 배포

    다음 구성 매개변수 및 콘텐츠를 사용하여 Python Cloud 함수를 배포합니다.

    Cloud 함수 구성 매개변수 지정

    • 트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.

      • 트리거 유형. Cloud Storage

      • 이벤트 유형. 완료/생성.

      • 버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.

      • 실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.

    • 런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.

      • Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.

      • Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.

    • 코드 단계의 런타임 및 진입점. 이 예시의 코드를 추가할 때 Python 3.7 또는 더 높은 런타임을 선택하고 trigger_dag를 시작점으로 지정합니다.

    요구사항 추가

    requirements.txt 파일에 종속 항목을 지정합니다.

    requests_toolbelt==0.9.1
    google-auth==2.6.2
    

    Cloud 함수 코드 추가

    다음 코드를 main.py 파일에 넣고 다음과 같이 바꿉니다.

    • client_id 변수 값을 이전에 가져온 client_id 값으로 바꿉니다.

    • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전에 Airflow 웹 인터페이스 URL을 가져왔습니다.

    • 사용할 Airflow REST API 버전을 지정합니다.

      • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
      • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.
    
    from google.auth.transport.requests import Request
    from google.oauth2 import id_token
    import requests
    
    IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
    OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
    # If you are using the stable API, set this value to False
    # For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
    USE_EXPERIMENTAL_API = True
    
    def trigger_dag(data, context=None):
        """Makes a POST request to the Composer DAG Trigger API
    
        When called via Google Cloud Functions (GCF),
        data and context are Background function parameters.
    
        For more info, refer to
        https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python
    
        To call this function from a Python script, omit the ``context`` argument
        and pass in a non-null value for the ``data`` argument.
    
        This function is currently only compatible with Composer v1 environments.
        """
    
        # Fill in with your Composer info here
        # Navigate to your webserver's login page and get this from the URL
        # Or use the script found at
        # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
        client_id = 'YOUR-CLIENT-ID'
        # This should be part of your webserver's URL:
        # {tenant-project-id}.appspot.com
        webserver_id = 'YOUR-TENANT-PROJECT'
        # The name of the DAG you wish to trigger
        dag_name = 'composer_sample_trigger_response_dag'
    
        if USE_EXPERIMENTAL_API:
            endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
            json_data = {'conf': data, 'replace_microseconds': 'false'}
        else:
            endpoint = f'api/v1/dags/{dag_name}/dagRuns'
            json_data = {'conf': data}
        webserver_url = (
            'https://'
            + webserver_id
            + '.appspot.com/'
            + endpoint
        )
        # Make a POST request to IAP which then Triggers the DAG
        make_iap_request(
            webserver_url, client_id, method='POST', json=json_data)
    
    # This code is copied from
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
    # START COPIED IAP CODE
    def make_iap_request(url, client_id, method='GET', **kwargs):
        """Makes a request to an application protected by Identity-Aware Proxy.
        Args:
          url: The Identity-Aware Proxy-protected URL to fetch.
          client_id: The client ID used by Identity-Aware Proxy.
          method: The request method to use
                  ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
          **kwargs: Any of the parameters defined for the request function:
                    https://github.com/requests/requests/blob/master/requests/api.py
                    If no timeout is provided, it is set to 90 by default.
        Returns:
          The page body, or raises an exception if the page couldn't be retrieved.
        """
        # Set the default timeout, if missing
        if 'timeout' not in kwargs:
            kwargs['timeout'] = 90
    
        # Obtain an OpenID Connect (OIDC) token from metadata server or using service
        # account.
        google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
    
        # Fetch the Identity-Aware Proxy-protected URL, including an
        # Authorization header containing "Bearer " followed by a
        # Google-issued OpenID Connect token for the service account.
        resp = requests.request(
            method, url,
            headers={'Authorization': 'Bearer {}'.format(
                google_open_id_connect_token)}, **kwargs)
        if resp.status_code == 403:
            raise Exception('Service account does not have permission to '
                            'access the IAP-protected application.')
        elif resp.status_code != 200:
            raise Exception(
                'Bad response from application: {!r} / {!r} / {!r}'.format(
                    resp.status_code, resp.headers, resp.text))
        else:
            return resp.text
    # END COPIED IAP CODE
    

    함수 테스트

    함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.

    1. 함수가 배포될 때까지 기다립니다.
    2. 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud Console에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
    3. Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
    4. Airflow UI에서 이 실행의 작업 로그를 확인합니다. print_gcs_info 태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
    [2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
    [2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
        {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
        crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
        ... }
    [2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
        return code 0h
    

    다음 단계