Menjalankan DAG Apache Airflow di Cloud Composer 2 (Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 2.
Jika Anda baru menggunakan Airflow, lihat Tutorial konsep Airflow di Apache Airflow untuk informasi lebih lanjut tentang konsep Airflow, objek, dan penggunaannya.
Jika Anda ingin menggunakan konsol Google Cloud, lihat Jalankan DAG Apache Airflow di Cloud Composer.
Jika Anda ingin membuat lingkungan menggunakan Terraform, lihat Membuat lingkungan (Terraform).
Sebelum memulai
- Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
- Menginstal Google Cloud CLI.
-
Untuk initialize gcloud CLI, jalankan perintah berikut:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.
- Menginstal Google Cloud CLI.
-
Untuk initialize gcloud CLI, jalankan perintah berikut:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.
-
Aktifkan API Cloud Composer:
gcloud services enable composer.googleapis.com
-
Untuk mendapatkan izin yang diperlukan dalam menyelesaikan panduan memulai ini, minta administrator untuk memberi Anda peran IAM berikut pada project Anda:
-
Untuk melihat dan mengelola lingkungan Cloud Composer:
-
Administrator Objek Lingkungan dan Penyimpanan (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Administrator Objek Lingkungan dan Penyimpanan (
-
Untuk melihat log:
Logs Viewer (
roles/logging.viewer
)
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
-
Untuk melihat dan mengelola lingkungan Cloud Composer:
Membuat lingkungan
Jika ini adalah lingkungan pertama
dalam proyek Anda, maka
menambahkan akun Agen Layanan Cloud Composer sebagai akun utama baru
di akun layanan lingkungan Anda dan memberikan
roles/composer.ServiceAgentV2Ext
peran tersebut.
Secara default, lingkungan Anda menggunakan akun layanan Compute Engine default, dan contoh berikut menunjukkan cara menambahkan izin ini ke dalamnya.
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
Buat lingkungan baru bernama example-environment
di us-central1
region, dengan versi Cloud Composer 2 terbaru.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.9.3-airflow-2.9.1
Membuat file DAG
DAG Airflow adalah kumpulan tugas terorganisir yang yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.
Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py
.
Kode Python dalam file ini melakukan hal berikut:
- Membuat DAG,
composer_sample_dag
. DAG ini berjalan setiap hari. - Menjalankan satu tugas,
print_dag_run_conf
. Tugas ini mencetak proses DAG konfigurasinya dengan menggunakan operator {i>bash<i}.
Simpan salinan file quickstart.py
di komputer lokal Anda:
Mengupload file DAG ke bucket lingkungan Anda
Setiap lingkungan Cloud Composer memiliki Cloud Storage
yang terkait dengannya. Hanya jadwal Airflow di Cloud Composer
DAG yang berada dalam folder /dags
di bucket ini.
Untuk menjadwalkan DAG, upload quickstart.py
dari komputer lokal Anda ke
folder /dags
lingkungan:
Untuk mengupload quickstart.py
dengan Google Cloud CLI, jalankan perintah berikut di
folder tempat file quickstart.py
berada:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Melihat DAG
Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:
- Mengurai file DAG yang Anda upload. Mungkin diperlukan waktu beberapa menit DAG agar tersedia untuk Airflow.
- Menambahkan DAG ke daftar DAG yang tersedia.
- Mengeksekusi DAG sesuai dengan jadwal yang Anda berikan di file DAG.
Pastikan DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat Informasi DAG di Konsol Google Cloud. Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan web Airflow native dalam antarmuka berbasis web yang sederhana.
Tunggu sekitar lima menit guna memberi Airflow waktu untuk memproses file DAG yang telah Anda upload sebelumnya, dan untuk menyelesaikan DAG pertama, (akan dijelaskan nanti).
Jalankan perintah berikut di Google Cloud CLI. Perintah ini menjalankan
dags list
Perintah CLI Airflow yang mencantumkan DAG di lingkungan fleksibel App Engine.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Pastikan DAG
composer_quickstart
tercantum dalam output perintah.Contoh output:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Melihat detail operasi DAG
Satu eksekusi DAG disebut run DAG. Aliran udara segera menjalankan DAG untuk contoh DAG karena tanggal mulai di file DAG adalah ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal proyek.
Contoh DAG berisi satu tugas, print_dag_run_conf
, yang menjalankan echo
di konsol. Perintah ini menghasilkan informasi meta tentang DAG
(ID numerik operasi DAG).
Jalankan perintah berikut di Google Cloud CLI. Perintah ini mencantumkan operasi DAG
untuk DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Contoh output:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI tidak menyediakan perintah untuk melihat log tugas. Anda dapat menggunakan metode lain untuk melihat log tugas Airflow: Cloud Composer DAG UI, Airflow UI, atau Cloud Logging. Panduan ini menunjukkan cara membuat kueri Cloud Logging untuk log dari proses DAG tertentu.
Jalankan perintah berikut di Google Cloud CLI. Perintah ini membaca log dari
Cloud Logging untuk operasi DAG tertentu dari DAG composer_quickstart
. Tujuan
Argumen --format
memformat output sehingga hanya teks pesan log
yang ditampilkan.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ganti:
RUN_ID
dengan nilairun_id
dari outputtasks states-for-dag-run
yang Anda jalankan sebelumnya. Sebagai contoh,2024-02-17T15:38:38.969307+00:00
.
Contoh output:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
Hapus resource yang digunakan dalam tutorial ini:
Hapus lingkungan Cloud Composer:
Di Konsol Google Cloud, buka halaman Environments.
Pilih
example-environment
, lalu klik Hapus.Tunggu hingga lingkungan dihapus.
Hapus bucket lingkungan Anda. Menghapus Cloud Composer tidak akan menghapus bucket-nya.
Di Konsol Google Cloud, buka Storage > Browser.
Pilih bucket lingkungan dan klik Delete. Misalnya, bucket dapat diberi nama
us-central1-example-environ-c1616fe8-bucket
.
Hapus persistent disk dari antrean Redis lingkungan Anda. Menghapus Lingkungan Cloud Composer tidak menghapus persistent disk-nya.
Di Konsol Google Cloud, buka Compute Engine > Disk.
Pilih persistent disk antrean Redis lingkungan lingkungan Anda dan klik Hapus.
Misalnya, {i>disk<i} ini dapat berupa bernama
pvc-02bc4842-2312-4347-8519-d87bdcd31115
. Disk untuk Cloud Composer 2 selalu memiliki jenisBalanced persistent disk
dan berukuran 2 GB.
Langkah selanjutnya