Menjalankan DAG Apache Airflow di Cloud Composer 3

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 3.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Aktifkan API Cloud Composer.

    Mengaktifkan API

  7. Untuk mendapatkan izin yang diperlukan dalam menyelesaikan panduan memulai ini, minta administrator untuk memberi Anda peran IAM berikut pada project Anda:

    Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

    Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui perintah peran atau setelan standar lainnya peran tertentu.

Membuat lingkungan

  1. Di konsol Google Cloud, buka halaman Create environment.

    Buka Buat lingkungan

  2. Di kolom Name, masukkan example-environment.

  3. Di menu drop-down Lokasi, pilih wilayah untuk lingkungan Cloud Composer. Panduan ini menggunakan region us-central1.

  4. Untuk opsi konfigurasi lingkungan lainnya, gunakan setelan default yang disediakan.

  5. Klik Create dan tunggu hingga lingkungan dibuat.

  6. Setelah selesai, tanda centang hijau akan ditampilkan di samping nama lingkungan.

Membuat file DAG

DAG Airflow adalah kumpulan tugas terorganisir yang yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py. Kode Python dalam file ini melakukan hal berikut:

  1. Membuat DAG, composer_sample_dag. DAG ini berjalan setiap hari.
  2. Menjalankan satu tugas, print_dag_run_conf. Tugas ini mencetak proses DAG konfigurasinya dengan menggunakan operator {i>bash<i}.

Simpan salinan file quickstart.py di komputer lokal Anda:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Mengupload file DAG ke bucket lingkungan Anda

Setiap lingkungan Cloud Composer memiliki Cloud Storage yang terkait dengannya. Airflow di Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags di bucket ini.

Untuk menjadwalkan DAG, upload quickstart.py dari komputer lokal Anda ke folder /dags lingkungan:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Detail lingkungan akan terbuka.

  3. Klik Open DAGs folder. Halaman Bucket details akan terbuka.

  4. Klik Upload files lalu pilih salinan quickstart.py.

  5. Untuk mengupload file, klik Open.

Melihat DAG

Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:

  1. Mengurai file DAG yang Anda upload. Mungkin perlu waktu beberapa menit hingga DAG tersedia untuk Airflow.
  2. Menambahkan DAG ke daftar DAG yang tersedia.
  3. Mengeksekusi DAG sesuai dengan jadwal yang Anda berikan di file DAG.

Pastikan DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat Informasi DAG di Konsol Google Cloud. Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow native.

  1. Tunggu sekitar lima menit guna memberi Airflow waktu untuk memproses file DAG yang telah Anda upload sebelumnya, dan untuk menyelesaikan DAG pertama, (akan dijelaskan nanti).

  2. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  3. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Detail lingkungan akan terbuka.

  4. Buka tab DAGs.

  5. Memastikan DAG composer_quickstart ada dalam daftar DAG.

    Daftar DAG menampilkan DAG composer_quickstart dengan
    informasi tambahan seperti status dan jadwal
    Gambar 1. Daftar DAG menampilkan composer_quickstart DAG (klik untuk memperbesar)

Melihat detail operasi DAG

Satu eksekusi DAG disebut eksekusi DAG. Aliran udara segera menjalankan DAG untuk contoh DAG karena tanggal mulai di file DAG adalah ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal proyek.

Contoh DAG berisi satu tugas, print_dag_run_conf, yang menjalankan echo di konsol. Perintah ini menghasilkan informasi meta tentang DAG (ID numerik operasi DAG).

  1. Di tab DAGs, klik composer_quickstart. Tab Runs untuk DAG akan terbuka.

  2. Dalam daftar operasi DAG, klik entri pertama.

    Daftar operasi DAG menunjukkan operasi DAG baru-baru ini (tanggal eksekusinya
    dan status)
    Gambar 2. Daftar DAG yang dijalankan untuk composer_quickstart DAG (klik untuk memperbesar)
  3. Detail proses DAG ditampilkan, yang memerinci informasi tentang pada contoh DAG.

    Daftar tugas dengan entri print_dag_run_conf, awalannya
    waktu, waktu berakhir, dan durasi
    Gambar 3. Daftar tugas yang dieksekusi di DAG berjalan (klik untuk memperbesar)
  4. Bagian Logs for DAG run mencantumkan log untuk semua tugas di DAG yang dijalankan. Anda dapat melihat output perintah echo di log.

    Entri log tugas, salah satunya adalah Output dan daftar lainnya
    ID
    Gambar 4. Log tugas print_dag_run_conf (klik untuk memperbesar)

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer:

    1. Di Konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Pilih example-environment, lalu klik Hapus.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus Cloud Composer tidak akan menghapus bucket-nya.

    1. Di Konsol Google Cloud, buka Storage > Browser.

      Buka Storage > Browser

    2. Pilih bucket lingkungan dan klik Delete. Misalnya, bucket dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

  3. Hapus persistent disk dari antrean Redis lingkungan Anda. Menghapus Lingkungan Cloud Composer tidak menghapus persistent disk-nya.

    1. Di konsol Google Cloud, buka Compute Engine > Disks.

      Buka Disk

    2. Pilih persistent disk antrean Redis lingkungan lingkungan Anda dan klik Hapus.

      Misalnya, {i>disk<i} ini dapat diberi nama gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disk untuk Cloud Composer 1 selalu memiliki Standard persistent disk jenis dan ukuran 2 GB.

Langkah selanjutnya