Menambahkan dan mengupdate DAG

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan cara mengelola DAG di lingkungan Cloud Composer Anda.

Cloud Composer menggunakan bucket Cloud Storage untuk menyimpan DAG lingkungan Cloud Composer Anda. Lingkungan Anda menyinkronkan DAG dari bucket ini ke komponen Airflow seperti pekerja dan penjadwal Airflow.

Sebelum memulai

  • Karena Apache Airflow tidak menyediakan isolasi DAG yang kuat, sebaiknya Anda mempertahankan lingkungan produksi dan pengujian yang terpisah untuk mencegah interferensi DAG. Untuk informasi selengkapnya, lihat Menguji DAG.
  • Pastikan akun Anda memiliki izin yang cukup untuk mengelola DAG.
  • Perubahan pada DAG akan disebarkan ke Airflow dalam waktu 3-5 menit. Anda dapat melihat status tugas di Airflow web interface.

Mengakses bucket lingkungan Anda

Untuk mengakses bucket yang terkait dengan lingkungan Anda:

Konsol

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda, dan di kolom DAGs folder, klik link DAGs. Halaman Bucket details akan terbuka. Ini akan menampilkan konten folder /dags di bucket lingkungan Anda.

gcloud

gcloud CLI memiliki perintah terpisah untuk menambahkan dan menghapus DAG di bucket lingkungan Anda.

Jika ingin berinteraksi dengan bucket lingkungan, Anda juga dapat menggunakan alat command line gsutil. Untuk mendapatkan alamat bucket lingkungan Anda, jalankan perintah gcloud CLI berikut:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.

Contoh:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Buat permintaan environments.get API. Di resource Environment, di resource EnvironmentConfig, resource dagGcsPrefix adalah alamat bucket lingkungan Anda.

Contoh:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Gunakan library google-auth untuk mendapatkan kredensial dan menggunakan library requests untuk memanggil REST API.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Menambahkan atau mengupdate DAG

Untuk menambahkan atau memperbarui DAG, pindahkan file .py Python untuk DAG ke folder /dags di bucket lingkungan.

Konsol

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda, dan di kolom DAGs folder, klik link DAGs. Halaman Bucket details akan terbuka. Ini akan menampilkan konten folder /dags di bucket lingkungan Anda.

  3. Klik Upload files. Kemudian, pilih file .py Python untuk DAG menggunakan dialog browser dan konfirmasi.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.
  • LOCAL_FILE_TO_UPLOAD adalah file .py Python untuk DAG.

Contoh:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Mengupdate DAG yang memiliki operasi DAG aktif

Jika Anda mengupdate DAG yang memiliki DAG aktif yang berjalan:

  • Semua tugas yang sedang dijalankan selesai menggunakan file DAG asli.
  • Semua tugas yang dijadwalkan tetapi saat ini tidak berjalan, menggunakan file DAG yang telah diupdate.
  • Semua tugas yang tidak lagi ada dalam file DAG yang diupdate akan ditandai sebagai dihapus.

Memperbarui DAG yang berjalan dengan jadwal yang sering

Setelah Anda mengupload file DAG, perlu waktu beberapa saat agar Airflow dapat memuat file ini dan mengupdate DAG. Jika DAG berjalan pada jadwal yang sering, Anda mungkin ingin memastikan bahwa DAG menggunakan file DAG versi terbaru. Untuk melakukannya:

  1. Jeda DAG di UI Airflow.
  2. Upload file DAG yang telah diperbarui.
  3. Tunggu hingga Anda melihat update di UI Airflow. Ini berarti DAG telah diuraikan dengan benar oleh penjadwal dan diperbarui di database Airflow.

    Jika UI Airflow menampilkan DAG yang telah diupdate, hal ini tidak menjamin bahwa pekerja Airflow memiliki versi file DAG yang telah diupdate. Hal ini terjadi karena file DAG disinkronkan secara independen untuk penjadwal dan pekerja.

  4. Anda mungkin ingin memperpanjang waktu tunggu untuk memastikan bahwa file DAG disinkronkan dengan semua pekerja di lingkungan Anda. Sinkronisasi terjadi beberapa kali per menit. Di lingkungan yang sehat, menunggu sekitar 20-30 detik sudah cukup untuk disinkronkan oleh semua pekerja.

  5. (Opsional) Jika Anda ingin benar-benar yakin bahwa semua pekerja memiliki file DAG versi baru, periksa log untuk setiap pekerja. Untuk melakukannya:

    1. Buka tab Logs untuk lingkungan Anda di konsol Google Cloud.

    2. Buka Composer logs > Infrastructure > Cloud Storage sync lalu periksa log untuk setiap worker di lingkungan Anda. Cari item log Syncing dags directory terbaru yang memiliki stempel waktu setelah Anda mengupload file DAG baru. Jika Anda melihat item Finished syncing yang mengikutinya, berarti DAG berhasil disinkronkan pada pekerja ini.

  6. Lanjutkan DAG.

Menghapus DAG

Bagian ini menjelaskan cara menghapus DAG.

Menghapus DAG di lingkungan Anda

Untuk menghapus DAG, hapus file .py Python untuk DAG dari folder /dags lingkungan di bucket lingkungan Anda.

Konsol

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda, dan di kolom DAGs folder, klik link DAGs. Halaman Bucket details akan terbuka. Ini akan menampilkan isi folder /dags di bucket lingkungan Anda.

  3. Pilih file DAG, klik Delete, lalu konfirmasi operasi tersebut.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.
  • DAG_FILE dengan file .py Python untuk DAG.

Contoh:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Menghapus DAG dari UI Airflow

Anda dapat menghapus DAG dari UI Airflow di Airflow 1.10.0 atau versi yang lebih baru.

Untuk menghapus metadata DAG dari antarmuka web Airflow:

UI Airflow

  1. Buka UI Airflow untuk lingkungan Anda.
  2. Untuk DAG, klik Delete DAG.

gcloud

Pada Airflow 1 versi yang lebih lama dari 1.14.0, jalankan perintah berikut di gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

Di Airflow 2, Airflow 1.14.0 dan versi yang lebih baru, jalankan perintah berikut di gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.
  • DAG_NAME adalah nama DAG yang akan dihapus.

Langkah selanjutnya