Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 Cloud Run Functions를 사용하여 이벤트에 대한 응답으로 Cloud Composer DAG를 트리거하는 방법을 설명합니다.
Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Run Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다.
이 가이드의 예는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.
시작하기 전에
환경의 네트워킹 구성 확인
비공개 IP 및 VPC 서비스 제어 구성에서는 Cloud Run Functions에서 Airflow 웹 서버로의 연결을 구성할 수 없으므로 해당 구성에서는 이 솔루션이 작동하지 않습니다.
Cloud Composer 2에서 또 다른 접근 방법인 Cloud Run Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거를 사용할 수 있습니다.
프로젝트에 API 사용 설정
콘솔
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Airflow REST API 사용 설정
Airflow 2의 경우 안정적인 REST API가 기본으로 사용 설정되어 있습니다. 환경에서 안정적인 API를 사용하지 않는 경우 안정적인 REST API를 사용 설정합니다.
웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용
Cloud Run Functions는 IPv4 또는 IPv6 주소를 사용하여 Airflow REST API에 연결할 수 있습니다.
호출 IP 범위를 모르는 경우 웹 서버 액세스 제어의 기본 구성 옵션인 All IP addresses have access (default)
를 사용하여 Cloud Run Functions를 실수로 차단하지 않도록 합니다.
Cloud Storage 버킷 만들기
이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.
Airflow 웹 서버 URL 가져오기
이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다. Cloud 함수 코드에서 Airflow 웹 서버 URL을 사용합니다.
콘솔
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
환경 이름을 클릭합니다.
환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.
Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.
gcloud
다음 명령어를 실행합니다.
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
다음과 같이 바꿉니다.
ENVIRONMENT_NAME
을 환경 이름으로 바꿉니다.LOCATION
: 환경이 위치한 리전
환경에 DAG 업로드
환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.
DAG를 트리거하는 Cloud 함수 배포
Cloud Run Functions 또는 Cloud Run에서 지원되는 선호 언어를 사용하여 Cloud 함수를 배포할 수 있습니다. 이 튜토리얼에서는 Python 및 Java로 구현된 Cloud 함수를 보여줍니다.
Cloud 함수 구성 매개변수 지정
트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.
트리거 유형. Cloud Storage
이벤트 유형. 완료/생성.
버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.
실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.
런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.
Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.
Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.
코드 단계의 런타임 및 진입점.
(Python) 이 예시의 코드를 추가할 때 Python 3.7 또는 더 나중의 런타임을 선택하고
trigger_dag_gcf
를 시작점으로 지정합니다.(Java) 이 예시의 코드를 추가할 때 Java 17 런타임을 선택하고
com.example.Example
을 진입점으로 지정합니다.
요구사항 추가
Python
requirements.txt
파일에 종속 항목을 지정합니다.
자바
Google Cloud Functions UI에서 생성된 pom.xml
의 dependencies
섹션에 다음 종속 항목을 추가합니다.
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-docs</artifactId>
<version>v1-rev20210707-1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.14.0</version>
</dependency>
Python
Airflow REST API를 사용한 DAG 트리거 코드 추가 composer2_airflow_rest_api.py
라는 파일을 만들고 이 파일에 Airflow REST API를 호출하는 코드를 놓습니다.
변수를 변경하지 마세요. Cloud 함수가 main.py
파일에서 이 파일을 가져옵니다.
다음 코드를 main.py
파일에 놓습니다. web_server_url
변수 값을 이전에 가져온 Airflow 웹 서버 주소로 바꿉니다.
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
자바
다음 코드를 Example.java
파일에 놓습니다. webServerUrl
변수 값을 이전에 가져온 Airflow 웹 서버 주소로 바꿉니다.
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.example;
import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
/**
* Cloud Function that triggers an Airflow DAG in response to an event (in
* this case a Cloud Storage event).
*/
public class Example implements BackgroundFunction<GcsEvent> {
private static final Logger logger = Logger.getLogger(Example.class.getName());
// TODO(developer): replace with your values
// Replace webServerUrl with the Airflow web server address. To obtain this
// URL, run the following command for your environment:
// gcloud composer environments describe example-environment \
// --location=your-composer-region \
// --format="value(config.airflowUri)"
@Override
public void accept(GcsEvent event, Context context) throws Exception {
String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
String dagName = "composer_sample_trigger_response_dag";
String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);
logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
dagName, event.name));
logger.info(String.format("Triggering DAG via the following URL: %s", url));
GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
HttpRequestFactory requestFactory =
new NetHttpTransport().createRequestFactory(credentialsAdapter);
Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
Map<String, String> conf = new HashMap<String, String>();
conf.put("bucket", event.bucket);
conf.put("name", event.name);
conf.put("generation", event.generation);
conf.put("operation", context.eventType());
json.put("conf", conf);
HttpContent content = new JsonHttpContent(new GsonFactory(), json);
HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
request.getHeaders().setContentType("application/json");
HttpResponse response;
try {
response = request.execute();
int statusCode = response.getStatusCode();
logger.info("Response code: " + statusCode);
logger.info(response.parseAsString());
} catch (HttpResponseException e) {
// https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
logger.info("Received HTTP exception");
logger.info(e.getLocalizedMessage());
logger.info("- 400 error: wrong arguments passed to Airflow API");
logger.info("- 401 error: check if service account has Composer User role");
logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
logger.info("- 404 error: check Web Server URL");
} catch (Exception e) {
logger.info("Received exception");
logger.info(e.getLocalizedMessage());
}
}
/** Details of the storage event. */
public static class GcsEvent {
/** Bucket name. */
String bucket;
/** Object name. */
String name;
/** Object version. */
String generation;
}
}
함수 테스트
함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.
- 함수가 배포될 때까지 기다립니다.
- 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud 콘솔에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
- Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
- Airflow UI에서 이 실행의 작업 로그를 확인합니다.
print_gcs_info
태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
Python
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
자바
[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0