Cloud Functions 및 Airflow REST API로 Cloud Composer DAG 트리거

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Cloud Functions를 사용하여 이벤트에 대한 응답으로 Cloud Composer DAG를 트리거하는 방법을 설명합니다.

Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다.

이 가이드의 예시는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.

시작하기 전에

환경의 네트워킹 구성 확인

비공개 IP 및 VPC 서비스 제어 구성에서는 Cloud Functions에서 Airflow 웹 서버로의 연결을 구성할 수 없으므로 해당 구성에서는 이 솔루션이 작동하지 않습니다.

Cloud Composer 2에서 또 다른 접근 방법인 Cloud Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거를 사용할 수 있습니다.

프로젝트에 API 사용 설정

콘솔

Enable the Cloud Composer and Cloud Functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Airflow REST API 사용 설정

Airflow 2의 경우 안정적인 REST API가 기본으로 사용 설정되어 있습니다. 환경에서 안정적인 API를 사용하지 않는 경우 안정적인 REST API를 사용 설정합니다.

웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용

Cloud Functions는 IPv4 또는 IPv6 주소를 사용하여 Airflow REST API에 연결할 수 있습니다.

호출 IP 범위를 모르는 경우 웹 서버 액세스 제어의 기본 구성 옵션인 All IP addresses have access (default)를 사용하여 Cloud Functions를 실수로 차단하지 않도록 합니다.

Cloud Storage 버킷 만들기

이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.

Airflow 웹 서버 URL 가져오기

이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다. Cloud 함수 코드에서 Airflow 웹 서버 URL을 사용합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 이름을 클릭합니다.

  3. 환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.

  4. Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.

gcloud

다음 명령어를 실행합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 리전

환경에 DAG 업로드

환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

DAG를 트리거하는 Cloud 함수 배포

Cloud Functions 또는 Cloud Run에서 지원되는 선호 언어를 사용하여 Cloud 함수를 배포할 수 있습니다. 이 튜토리얼에서는 PythonJava로 구현된 Cloud 함수를 보여줍니다.

Cloud 함수 구성 매개변수 지정

  • 트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.

    • 트리거 유형. 결정 트리를 살펴보겠습니다

    • 이벤트 유형. 완료/생성.

    • 버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.

    • 실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.

  • 런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.

    • Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.

    • Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.

  • 코드 단계의 런타임 및 진입점.

    • (Python) 이 예시의 코드를 추가할 때 Python 3.7 또는 더 나중의 런타임을 선택하고 trigger_dag_gcf를 시작점으로 지정합니다.

    • (Java) 이 예시의 코드를 추가할 때 Java 17 런타임을 선택하고 com.example.Example을 진입점으로 지정합니다.

요구사항 추가

Python

requirements.txt 파일에 종속 항목을 지정합니다.

google-auth==2.19.1
requests==2.32.2

자바

Google Cloud Functions UI에서 생성된 pom.xmldependencies 섹션에 다음 종속 항목을 추가합니다.

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

Airflow REST API를 사용한 DAG 트리거 코드 추가 composer2_airflow_rest_api.py라는 파일을 만들고 이 파일에 Airflow REST API를 호출하는 코드를 놓습니다.

변수를 변경하지 마세요. Cloud 함수가 main.py 파일에서 이 파일을 가져옵니다.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

다음 코드를 main.py 파일에 놓습니다. web_server_url 변수 값을 이전에 가져온 Airflow 웹 서버 주소로 바꿉니다.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

자바

다음 코드를 Example.java 파일에 놓습니다. webServerUrl 변수 값을 이전에 가져온 Airflow 웹 서버 주소로 바꿉니다.


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

함수 테스트

함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.

  1. 함수가 배포될 때까지 기다립니다.
  2. 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud 콘솔에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
  3. Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
  4. Airflow UI에서 이 실행의 작업 로그를 확인합니다. print_gcs_info 태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

자바

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

다음 단계