Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 Cloud Run Functions를 사용하여 이벤트에 대한 응답으로 Cloud Composer DAG를 트리거하는 방법을 설명합니다.
Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 수행하는 한 가지 방법은 Cloud Run Functions를 사용하여 지정된 이벤트가 발생할 때 Cloud Composer DAG를 트리거하는 것입니다.
이 가이드의 예는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 DAG를 실행합니다. 버킷의 객체가 변경되면 함수가 트리거됩니다. 이 함수는 Cloud Composer 환경의 Airflow REST API를 요청합니다. Airflow가 이 요청을 처리하고 DAG를 실행합니다. DAG는 변경사항에 대한 정보를 출력합니다.
시작하기 전에
환경의 네트워킹 구성 확인
비공개 IP 및 VPC 서비스 제어 구성에서는 Cloud Run Functions에서 Airflow 웹 서버로의 연결을 구성할 수 없으므로 해당 구성에서는 이 솔루션이 작동하지 않습니다.
Cloud Composer 2에서 또 다른 접근 방법인 Cloud Run Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거를 사용할 수 있습니다.
프로젝트에 API 사용 설정
콘솔
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Airflow REST API 사용 설정
Airflow 버전에 따라 다음 안내를 따르세요.
- Airflow 2의 경우 안정적인 REST API가 기본으로 사용 설정되어 있습니다. 환경에서 안정적인 API를 사용하지 않는 경우 안정적인 REST API를 사용 설정합니다.
- Airflow 1의 경우 실험용 REST API를 사용 설정합니다.
웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용
Cloud Run Functions는 IPv4 또는 IPv6 주소를 사용하여 Airflow REST API에 연결할 수 있습니다.
호출 IP 범위를 모르는 경우 웹 서버 액세스 제어의 기본 구성 옵션인 All IP addresses have access (default)
를 사용하여 Cloud Run Functions를 실수로 차단하지 않도록 합니다.
Cloud Storage 버킷 만들기
이 예시에서는 Cloud Storage 버킷의 변경사항에 대한 응답으로 DAG를 트리거합니다. 이 예시에 사용할 새 버킷을 만듭니다.
Airflow 웹 서버 URL 가져오기
이 예시에서는 Airflow 웹 서버 엔드포인트에 REST API 요청을 보냅니다.
Cloud 함수 코드에서 .appspot.com
앞에 Airflow 웹 인터페이스 URL의 일부를 사용합니다.
콘솔
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
환경 이름을 클릭합니다.
환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.
Airflow 웹 서버의 URL이 Airflow 웹 UI 항목에 나열됩니다.
gcloud
다음 명령어를 실행합니다.
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
다음과 같이 바꿉니다.
ENVIRONMENT_NAME
: 환경 이름LOCATION
을 환경이 위치한 리전으로 바꿉니다.
IAM 프록시의 client_id 가져오기
Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 Identity and Access Management 프록시의 클라이언트 ID가 필요합니다.
Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
AIRFLOW_URL
를 Airflow 웹 인터페이스의 URL로 바꿉니다.
출력에서 client_id
다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
다음 코드를 get_client_id.py
파일에 저장합니다. project_id
, location
, composer_environment
값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.
환경에 DAG 업로드
환경에 DAG를 업로드합니다. 다음 예시 DAG는 수신된 DAG 실행 구성을 출력합니다. 이 가이드에서 나중에 만드는 함수로부터 이 DAG를 트리거합니다.
DAG를 트리거하는 Cloud 함수 배포
Cloud Run Functions 또는 Cloud Run에서 지원되는 선호 언어를 사용하여 Cloud 함수를 배포할 수 있습니다. 이 튜토리얼에서는 Python 및 Java로 구현된 Cloud 함수를 보여줍니다.
Cloud 함수 구성 매개변수 지정
트리거. 이 예시에서는 새 객체가 버킷에 생성될 때 또는 기존 객체를 덮어쓸 때 작동하는 트리거를 선택합니다.
트리거 유형. Cloud Storage
이벤트 유형. 완료/생성.
버킷. 이 함수를 트리거해야 하는 버킷을 선택합니다.
실패 시 재시도. 이 예시에서는 이 옵션을 사용 중지하는 것이 좋습니다. 프로덕션 환경에서 함수를 사용할 경우 일시적인 오류를 처리하기 위해 이 옵션을 사용 설정합니다.
런타임, 빌드, 연결, 보안 설정 섹션의 런타임 서비스 계정. 환경설정에 따라 다음 옵션 중 하나를 사용합니다.
Compute Engine 기본 서비스 계정을 선택합니다. 기본 IAM 권한의 경우 이 계정이 Cloud Composer 환경에 액세스하는 함수를 실행할 수 있습니다.
Composer User 역할이 있는 커스텀 서비스 계정을 만들고 이 함수에 대한 런타임 서비스 계정으로 지정합니다. 이 옵션은 최소 권한 원칙을 따릅니다.
코드 단계의 런타임 및 진입점. 이 예시의 코드를 추가할 때 Python 3.7 또는 더 최근의 런타임을 선택하고
trigger_dag
를 시작점으로 지정합니다.
요구사항 추가
requirements.txt
파일에 종속 항목을 지정합니다.
다음 코드를 main.py
파일에 넣고 다음과 같이 바꿉니다.
client_id
변수 값을 이전에 가져온client_id
값으로 바꿉니다.webserver_id
변수 값을.appspot.com
앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전에 Airflow 웹 인터페이스 URL을 가져왔습니다.사용할 Airflow REST API 버전을 지정합니다.
- 안정적인 Airflow REST API를 사용하는 경우
USE_EXPERIMENTAL_API
변수를False
로 설정합니다. - 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다.
USE_EXPERIMENTAL_API
변수가 이미True
로 설정되어 있습니다.
- 안정적인 Airflow REST API를 사용하는 경우
함수 테스트
함수 및 DAG가 의도한 대로 작동하는지 확인하기 위해 다음 안내를 따르세요.
- 함수가 배포될 때까지 기다립니다.
- 파일을 Cloud Storage 버킷에 업로드합니다. 또는 Google Cloud 콘솔에서 함수 테스트 작업을 선택하여 함수를 수동으로 트리거할 수 있습니다.
- Airflow 웹 인터페이스에서 DAG 페이지를 확인합니다. DAG는 DAG 실행이 활성 상태 또는 이미 완료된 상태여야 합니다.
- Airflow UI에서 이 실행의 작업 로그를 확인합니다.
print_gcs_info
태스크가 함수에서 수신된 데이터를 로그에 출력하는지 확인해야 합니다.
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h