Menjalankan DAG Apache Airflow di Cloud Composer 3 (Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 3.
Jika Anda baru menggunakan Airflow, lihat tutorial konsep Airflow dalam dokumentasi Apache Airflow untuk mengetahui informasi selengkapnya tentang konsep, objek, dan penggunaan Airflow.
Jika Anda ingin menggunakan konsol Google Cloud, lihat Menjalankan DAG Apache Airflow di Cloud Composer.
Jika Anda ingin membuat lingkungan menggunakan Terraform, lihat Membuat lingkungan (Terraform).
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan panduan memulai ini, minta administrator untuk memberi Anda peran IAM berikut di project Anda:
-
Untuk melihat, membuat, dan mengelola lingkungan Cloud Composer:
-
Environment and Storage Object Administrator (
roles/composer.environmentAndStorageObjectAdmin
) -
Service Account User (
roles/iam.serviceAccountUser
)
-
Environment and Storage Object Administrator (
-
Untuk melihat log:
Logs Viewer (
roles/logging.viewer
)
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
-
Untuk melihat, membuat, dan mengelola lingkungan Cloud Composer:
Membuat akun layanan lingkungan
Saat membuat lingkungan, Anda harus menentukan akun layanan. Akun layanan ini disebut akun layanan lingkungan. Lingkungan Anda menggunakan akun layanan ini untuk melakukan sebagian besar operasi.
Akun layanan untuk lingkungan Anda bukan akun pengguna. Akun layanan adalah jenis akun khusus yang digunakan oleh aplikasi atau instance virtual machine (VM), bukan orang.
Untuk membuat akun layanan bagi lingkungan Anda:
Buat akun layanan baru, seperti yang dijelaskan dalam dokumentasi Identity and Access Management.
Berikan peran ke akun tersebut, seperti yang dijelaskan dalam dokumentasi Identity and Access Management. Peran yang diperlukan adalah Pekerja Composer (
composer.worker
).
Membuat lingkungan
Buat lingkungan baru bernama example-environment
di region us-central1
, dengan versi Cloud Composer 3 terbaru.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.9.3-build.7
Membuat file DAG
DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.
Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py
.
Kode Python dalam file ini melakukan hal berikut:
- Membuat DAG,
composer_sample_dag
. DAG ini berjalan setiap hari. - Menjalankan satu tugas,
print_dag_run_conf
. Tugas ini mencetak konfigurasi eksekusi DAG menggunakan operator bash.
Simpan salinan file quickstart.py
di komputer lokal Anda:
Upload file DAG ke bucket lingkungan Anda
Setiap lingkungan Cloud Composer memiliki bucket Cloud Storage yang terkait. Airflow di Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags
di bucket ini.
Untuk menjadwalkan DAG, upload quickstart.py
dari komputer lokal ke
folder /dags
lingkungan Anda:
Untuk mengupload quickstart.py
dengan Google Cloud CLI, jalankan perintah berikut di
folder tempat file quickstart.py
berada:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Melihat DAG
Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:
- Mengurai file DAG yang Anda upload. Mungkin perlu waktu beberapa menit hingga DAG tersedia untuk Airflow.
- Menambahkan DAG ke daftar DAG yang tersedia.
- Menjalankan DAG sesuai dengan jadwal yang Anda berikan dalam file DAG.
Pastikan DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat informasi DAG di konsol Google Cloud. Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow native.
Tunggu sekitar lima menit untuk memberi Airflow waktu guna memproses file DAG yang Anda upload sebelumnya, dan menyelesaikan operasi DAG pertama (akan dijelaskan nanti).
Jalankan perintah berikut di Google Cloud CLI. Perintah ini mengeksekusi perintah Airflow CLI
dags list
yang mencantumkan DAG di lingkungan Anda.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Pastikan DAG
composer_quickstart
tercantum dalam output perintah.Contoh output:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Melihat detail operasi DAG
Satu eksekusi DAG disebut eksekusi DAG. Airflow segera menjalankan DAG untuk contoh DAG karena tanggal mulai dalam file DAG ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal DAG yang ditentukan.
Contoh DAG berisi satu tugas, print_dag_run_conf
, yang menjalankan perintah echo
di konsol. Perintah ini menghasilkan informasi meta tentang DAG
(ID numerik DAG run).
Jalankan perintah berikut di Google Cloud CLI. Perintah ini mencantumkan DAG yang dijalankan
untuk DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Contoh output:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI tidak menyediakan perintah untuk melihat log tugas. Anda dapat menggunakan metode lain untuk melihat log tugas Airflow: UI DAG Cloud Composer, UI Airflow, atau Cloud Logging. Panduan ini menunjukkan cara mengkueri Cloud Logging untuk log dari DAG tertentu yang dijalankan.
Jalankan perintah berikut di Google Cloud CLI. Perintah ini membaca log dari Cloud Logging untuk DAG tertentu yang dijalankan dari DAG composer_quickstart
. Argumen
--format
memformat output sehingga hanya teks pesan log
yang ditampilkan.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Ganti:
RUN_ID
dengan nilairun_id
dari output perintahtasks states-for-dag-run
yang Anda jalankan sebelumnya. Misalnya,2024-02-17T15:38:38.969307+00:00
.
Contoh output:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
Hapus resource yang digunakan dalam tutorial ini:
Hapus lingkungan Cloud Composer:
Di konsol Google Cloud, buka halaman Environments.
Pilih
example-environment
, lalu klik Hapus.Tunggu hingga lingkungan dihapus.
Hapus bucket lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus bucket-nya.
Di konsol Google Cloud, buka halaman Storage > Browser.
Pilih bucket lingkungan, lalu klik Hapus. Misalnya, bucket ini dapat diberi nama
us-central1-example-environ-c1616fe8-bucket
.
Hapus persistent disk antrean Redis lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus persistent disk-nya.
Di konsol Google Cloud, buka Compute Engine > Disks.
Pilih persistent disk antrean Redis lingkungan, lalu klik Hapus.
Misalnya, disk ini dapat diberi nama
gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee
. Disk untuk Cloud Composer 1 selalu memiliki jenisStandard persistent disk
dan ukuran 2 GB.
Langkah selanjutnya