Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menjelaskan cara mengelola DAG di Cloud Composer lingkungan fleksibel App Engine.
Cloud Composer menggunakan bucket Cloud Storage untuk menyimpan DAG lingkungan Cloud Composer Anda. Lingkungan Anda akan disinkronkan DAG dari bucket ini ke komponen Airflow seperti pekerja Airflow dan penjadwal.
Sebelum memulai
- Karena Apache Airflow tidak menyediakan isolasi DAG yang kuat, sebaiknya Anda mempertahankan lingkungan produksi dan pengujian yang terpisah untuk mencegah gangguan DAG. Untuk informasi selengkapnya, lihat Menguji DAG.
- Pastikan akun Anda memiliki izin yang cukup untuk mengelola DAG.
- Perubahan pada DAG akan diterapkan ke Airflow dalam waktu 3-5 menit. Anda dapat melihat tugas status di antarmuka web Airflow.
Mengakses bucket lingkungan Anda
Untuk mengakses bucket yang terkait dengan lingkungan Anda:
Konsol
Di Konsol Google Cloud, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda Di kolom DAGs folder, klik link DAGs. Tujuan Halaman Bucket details akan terbuka. Laporan ini menampilkan konten folder
/dags
di bucket lingkungan Anda.
gcloud
gcloud CLI memiliki perintah terpisah untuk tambahkan dan hapus DAG di bucket lingkungan Anda.
Jika ingin berinteraksi dengan bucket lingkungan, Anda juga dapat menggunakan Google Cloud CLI. Untuk mendapatkan alamat bucket lingkungan Anda, jalankan gcloud CLI berikut berikut:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungannya berada ditemukan.
Contoh:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Buat permintaan environments.get
API. Di beberapa
resource Environment, di
Resource EnvironmentConfig, di
resource dagGcsPrefix
adalah alamat bucket lingkungan Anda.
Contoh:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Gunakan library google-auth untuk mendapatkan kredensial dan menggunakan library requests untuk memanggil REST API.
Menambahkan atau memperbarui DAG
Untuk menambahkan atau mengupdate DAG, pindahkan file .py
Python untuk DAG ke
folder /dags
di bucket lingkungan.
Konsol
Di Konsol Google Cloud, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda Di kolom DAGs folder, klik link DAGs. Tujuan Halaman Bucket details akan terbuka. Laporan ini menampilkan konten folder
/dags
di bucket lingkungan Anda.Klik Upload files. Kemudian pilih file
.py
Python untuk DAG menggunakan dialog browser dan mengonfirmasi.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungannya berada ditemukan.LOCAL_FILE_TO_UPLOAD
adalah file.py
Python untuk DAG.
Contoh:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Mengupdate DAG yang memiliki operasi DAG aktif
Jika Anda mengupdate DAG yang menjalankan DAG aktif:
- Semua tugas yang sedang dijalankan selesai menggunakan file DAG asli.
- Semua tugas yang dijadwalkan tetapi tidak sedang berjalan menggunakan DAG.
- Semua tugas yang tidak lagi ada di file DAG yang diperbarui akan ditandai sebagai dihapus.
Mengupdate DAG yang berjalan dengan jadwal rutin
Setelah Anda mengupload file DAG, perlu waktu beberapa saat agar Airflow memuat file ini dan mengupdate DAG. Jika DAG berjalan dengan jadwal yang sering, Anda mungkin ingin pastikan DAG menggunakan file DAG versi terbaru. Untuk melakukannya:
Jeda DAG di UI Airflow.
Upload file DAG yang telah diperbarui.
Tunggu hingga Anda melihat update di UI Airflow. Ini berarti bahwa DAG diuraikan dengan benar oleh penjadwal dan diperbarui di database Airflow.
Jika UI Airflow menampilkan DAG yang diperbarui, ini tidak menjamin bahwa Pekerja Airflow memiliki file DAG versi terbaru. Hal ini sering terjadi karena file DAG disinkronkan secara independen untuk penjadwal dan pekerja.
Anda mungkin ingin memperpanjang waktu tunggu untuk memastikan bahwa file DAG telah disinkronkan dengan semua pekerja di lingkungan Anda. Sinkronisasi berlangsung beberapa kali per menit. Dalam lingkungan yang sehat, diperlukan 20–30 detik sudah cukup untuk sinkronisasi semua pekerja.
(Opsional) Jika Anda ingin benar-benar memastikan bahwa semua pekerja memiliki dari file DAG, memeriksa log untuk setiap pekerja. Untuk melakukannya:
Buka tab Logs untuk lingkungan Anda di Konsol Google Cloud.
Buka Log Composer > Infrastruktur > Item sinkronisasi Cloud Storage dan memeriksa log untuk setiap worker di lingkungan Anda. Cari log
Syncing dags directory
terbaru yang memiliki stempel waktu setelah Anda mengupload file DAG baru. Jika Anda melihat itemFinished syncing
yang mengikutinya, DAG akan berhasil disinkronkan pada pekerja ini.
Lanjutkan DAG.
Menghapus DAG di lingkungan Anda
Untuk menghapus DAG, hapus file .py
Python untuk DAG dari
folder /dags
lingkungan di bucket lingkungan Anda.
Konsol
Di Konsol Google Cloud, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda Di kolom DAGs folder, klik link DAGs. Tujuan Halaman Bucket details akan terbuka. Ini menampilkan konten folder
/dags
di bucket lingkungan Anda.Pilih file DAG, klik Delete, dan konfirmasi operasi.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungannya berada ditemukan.DAG_FILE
dengan file.py
Python untuk DAG.
Contoh:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Menghapus DAG dari UI Airflow
Guna menghapus metadata untuk DAG dari antarmuka web Airflow:
UI Airflow
- Buka UI Airflow untuk lingkungan Anda.
- Untuk DAG, klik Delete DAG.
gcloud
Jalankan perintah berikut di gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungan berada.DAG_NAME
adalah nama DAG yang akan dihapus.