Cloud Composer 2에서 Apache Airflow DAG 실행(Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2
이 빠른 시작 가이드에서는 Cloud Composer 환경을 만들고 Cloud Composer 2에서 Apache Airflow DAG를 실행하는 방법을 보여줍니다.
Airflow를 처음 사용하는 경우 Airflow 개념, 객체, 사용법에 대한 자세한 내용은 Apache Airflow 문서의 Airflow 개념 튜토리얼을 참조하세요.
대신 Google Cloud 콘솔을 사용하려면 Cloud Composer에서 Apache Airflow DAG 실행을 참조하세요.
Terraform을 사용하여 환경을 만들려면 환경 만들기(Terraform)를 참조하세요.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
빠른 시작을 완료하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Cloud Composer 환경 만들기 및 관리를 확인하려면 다음 안내를 따르세요.
-
환경 및 스토리지 객체 관리자(
roles/composer.environmentAndStorageObjectAdmin
) -
서비스 계정 사용자(
roles/iam.serviceAccountUser
)
-
환경 및 스토리지 객체 관리자(
-
로그 보기:
로그 뷰어(
roles/logging.viewer
)
역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.
-
Cloud Composer 환경 만들기 및 관리를 확인하려면 다음 안내를 따르세요.
환경 만들기
프로젝트의 첫 번째 환경인 경우 Cloud Composer 서비스 에이전트 계정을 환경 서비스 계정의 새 주 구성원으로 추가하고 roles/composer.ServiceAgentV2Ext
역할을 부여합니다.
기본적으로 환경은 기본 Compute Engine 서비스 계정을 사용하며 다음 예시는 여기에 이 권한을 추가하는 방법을 보여줍니다.
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
최신 Cloud Composer 2 버전으로 us-central1
리전에 example-environment
라는 새 환경을 만듭니다.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.8.1-airflow-2.7.3
DAG 파일 만들기
Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다.
이 가이드에서는 quickstart.py
파일에 정의된 Airflow DAG 예시를 사용합니다.
이 파일의 Python 코드는 다음을 수행합니다.
- DAG,
composer_sample_dag
를 만듭니다. 이 DAG는 매일 실행됩니다. - 태스크 하나,
print_dag_run_conf
를 실행합니다. 태스크는 bash 연산자를 사용하여 DAG 실행 구성을 출력합니다.
quickstart.py
파일의 사본을 로컬 머신에 저장합니다.
환경의 버킷에 DAG 업로드
모든 Cloud Composer 환경에는 Cloud Storage 버킷이 연결되어 있습니다. Cloud Composer의 Airflow는 이 버킷의 /dags
폴더에 있는 DAG만 예약합니다.
DAG를 예약하려면 로컬 머신에서 환경의 /dags
폴더로 quickstart.py
를 업로드합니다.
Google Cloud CLI로 quickstart.py
를 업로드하려면 quickstart.py
파일이 있는 폴더에서 다음 명령어를 실행합니다.
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG 보기
DAG 파일을 업로드하면 Airflow는 다음을 수행합니다.
- 업로드한 DAG 파일을 파싱합니다. Airflow에서 DAG를 사용할 수 있게 되는 데 몇 분 정도 걸릴 수 있습니다.
- 사용 가능한 DAG 목록에 DAG를 추가합니다.
- DAG 파일에 제공한 일정에 따라 DAG를 실행합니다.
DAG UI에서 확인하여 DAG가 오류 없이 처리되고 Airflow에서 사용할 수 있는지 확인합니다. DAG UI는 Google Cloud 콘솔에서 DAG 정보를 볼 수 있는 Cloud Composer 인터페이스입니다. 또한 Cloud Composer는 기본 Airflow 웹 인터페이스인 Airflow UI에 대한 액세스를 제공합니다.
Airflow가 이전에 업로드한 DAG 파일을 처리하고 첫 번째 DAG 실행을 완료할 때까지(이후에 설명) 약 5분 정도 기다립니다.
Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 환경의 DAG를 나열하는
dags list
Airflow CLI 명령어를 실행합니다.gcloud composer environments run example-environment \ --location us-central1 \ dags list
명령어 출력에
composer_quickstart
DAG가 나열되어 있는지 확인합니다.출력 예시:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
DAG 실행 세부정보 보기
DAG를 한 번 실행하는 것을 DAG 실행이라고 합니다. DAG 파일의 시작 날짜가 어제로 설정되어 있으므로 Airflow는 예시 DAG의 DAG 실행을 즉시 실행합니다. Airflow는 이러한 방식으로 지정된 DAG의 일정을 따라잡습니다.
예시 DAG에는 콘솔에서 echo
명령어를 실행하는 print_dag_run_conf
태스크 하나가 포함되어 있습니다. 이 명령어는 DAG(DAG 실행의 숫자 식별자)에 대한 메타 정보를 출력합니다.
Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 composer_quickstart
DAG의 DAG 실행을 나열합니다.
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
출력 예시:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI는 태스크 로그를 보는 명령어를 제공하지 않습니다. Cloud Composer DAG UI, Airflow UI 또는 Cloud Logging 등 다른 방법을 사용하여 Airflow 태스크 로그를 확인할 수 있습니다. 이 가이드에서는 특정 DAG 실행의 로그에 대한 Cloud Logging을 쿼리하는 방법을 보여줍니다.
Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 composer_quickstart
DAG의 특정 DAG 실행에 대한 Cloud Logging의 로그를 읽습니다. --format
인수는 로그 메시지의 텍스트만 표시하도록 출력 형식을 지정합니다.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
다음과 같이 바꿉니다.
RUN_ID
를 이전에 실행한tasks states-for-dag-run
명령어 출력의run_id
값으로 바꿉니다. 예를 들면2024-02-17T15:38:38.969307+00:00
입니다.
출력 예시:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
삭제
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.
이 튜토리얼에서 사용된 리소스를 삭제합니다.
Cloud Composer 환경을 삭제합니다.
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
example-environment
를 선택하고 삭제를 클릭합니다.환경이 삭제될 때까지 기다립니다.
환경의 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.
Google Cloud 콘솔에서 스토리지 > 브라우저 페이지로 이동합니다.
해당 환경의 버킷을 선택하고 삭제를 클릭합니다. 예를 들어 이 버킷의 이름을
us-central1-example-environ-c1616fe8-bucket
으로 지정할 수 있습니다.
해당 환경의 Redis 큐의 영구 디스크를 삭제합니다. Cloud Composer 환경을 삭제해도 영구 디스크는 삭제되지 않습니다.
Google Cloud 콘솔에서 Compute Engine > 디스크로 이동합니다.
환경의 Redis 큐 영구 디스크를 선택하고 삭제를 클릭합니다.
예를 들어 이 디스크의 이름을
pvc-02bc4842-2312-4347-8519-d87bdcd31115
로 지정할 수 있습니다. Cloud Composer 2의 디스크 크기는 항상Balanced persistent disk
유형이며 크기는 2 GB입니다.
다음 단계