Menjalankan DAG Apache Airflow di Cloud Composer 3

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 3.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan panduan memulai ini, minta administrator untuk memberi Anda peran IAM berikut di project Anda:

    Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

    Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat akun layanan lingkungan

Saat membuat lingkungan, Anda harus menentukan akun layanan. Akun layanan ini disebut akun layanan lingkungan. Lingkungan Anda menggunakan akun layanan ini untuk melakukan sebagian besar operasi.

Akun layanan untuk lingkungan Anda bukan akun pengguna. Akun layanan adalah jenis akun khusus yang digunakan oleh aplikasi atau instance virtual machine (VM), bukan orang.

Untuk membuat akun layanan bagi lingkungan Anda:

  1. Buat akun layanan baru, seperti yang dijelaskan dalam dokumentasi Identity and Access Management.

  2. Berikan peran ke akun tersebut, seperti yang dijelaskan dalam dokumentasi Identity and Access Management. Peran yang diperlukan adalah Pekerja Composer (composer.worker).

Membuat lingkungan

  1. Di konsol Google Cloud, buka halaman Create environment.

    Buka Buat lingkungan

  2. Di kolom Name, masukkan example-environment.

  3. Di menu drop-down Location, pilih region untuk lingkungan Cloud Composer. Panduan ini menggunakan region us-central1.

  4. Untuk opsi konfigurasi lingkungan lainnya, gunakan default yang disediakan.

  5. Klik Create dan tunggu hingga lingkungan dibuat.

  6. Setelah selesai, tanda centang hijau akan ditampilkan di samping nama lingkungan.

Membuat file DAG

DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py. Kode Python dalam file ini melakukan hal berikut:

  1. Membuat DAG, composer_sample_dag. DAG ini berjalan setiap hari.
  2. Menjalankan satu tugas, print_dag_run_conf. Tugas ini mencetak konfigurasi eksekusi DAG menggunakan operator bash.

Simpan salinan file quickstart.py di komputer lokal Anda:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Upload file DAG ke bucket lingkungan Anda

Setiap lingkungan Cloud Composer memiliki bucket Cloud Storage yang terkait. Airflow di Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags di bucket ini.

Untuk menjadwalkan DAG, upload quickstart.py dari komputer lokal ke folder /dags lingkungan Anda:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Environment details akan terbuka.

  3. Klik Buka folder DAG. Halaman Detail bucket akan terbuka.

  4. Klik Upload file, lalu pilih salinan quickstart.py Anda.

  5. Untuk mengupload file, klik Buka.

Melihat DAG

Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:

  1. Mengurai file DAG yang Anda upload. Mungkin perlu waktu beberapa menit hingga DAG tersedia untuk Airflow.
  2. Menambahkan DAG ke daftar DAG yang tersedia.
  3. Menjalankan DAG sesuai dengan jadwal yang Anda berikan dalam file DAG.

Pastikan DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat informasi DAG di konsol Google Cloud. Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow native.

  1. Tunggu sekitar lima menit untuk memberi Airflow waktu guna memproses file DAG yang Anda upload sebelumnya, dan menyelesaikan operasi DAG pertama (akan dijelaskan nanti).

  2. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  3. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Environment details akan terbuka.

  4. Buka tab DAG.

  5. Pastikan DAG composer_quickstart ada dalam daftar DAG.

    Daftar DAG menampilkan DAG composer_quickstart dengan
    informasi tambahan seperti status dan jadwal
    Gambar 1. Daftar DAG menampilkan DAG composer_quickstart (klik untuk memperbesar)

Melihat detail operasi DAG

Satu eksekusi DAG disebut eksekusi DAG. Airflow segera menjalankan DAG untuk contoh DAG karena tanggal mulai dalam file DAG ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal DAG yang ditentukan.

Contoh DAG berisi satu tugas, print_dag_run_conf, yang menjalankan perintah echo di konsol. Perintah ini menghasilkan informasi meta tentang DAG (ID numerik DAG run).

  1. Di tab DAG, klik composer_quickstart. Tab Runs untuk DAG akan terbuka.

  2. Dalam daftar DAG yang berjalan, klik entri pertama.

    Daftar operasi DAG menampilkan operasi DAG terbaru (tanggal dan status eksekusi)
    Gambar 2. Daftar DAG yang dijalankan untuk DAG composer_quickstart (klik untuk memperbesar)
  3. Detail operasi DAG ditampilkan, yang menjelaskan informasi tentang setiap tugas dari contoh DAG.

    Daftar tugas dengan entri print_dag_run_conf, waktu mulai, waktu akhir, dan durasi tugas
    Gambar 3. Daftar tugas yang dieksekusi dalam operasi DAG (klik untuk memperbesar)
  4. Bagian Log untuk operasi DAG mencantumkan log untuk semua tugas dalam operasi DAG. Anda dapat melihat output perintah echo di log.

    Entri log tugas, salah satunya adalah Output dan yang lainnya mencantumkan
    ID
    Gambar 4. Log tugas print_dag_run_conf (klik untuk memperbesar)

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer:

    1. Di konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Pilih example-environment, lalu klik Hapus.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus bucket-nya.

    1. Di konsol Google Cloud, buka halaman Storage > Browser.

      Buka Penyimpanan > Browser

    2. Pilih bucket lingkungan, lalu klik Hapus. Misalnya, bucket ini dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

  3. Hapus persistent disk antrean Redis lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus persistent disk-nya.

    1. Di konsol Google Cloud, buka Compute Engine > Disks.

      Buka Disk

    2. Pilih persistent disk antrean Redis lingkungan, lalu klik Hapus.

      Misalnya, disk ini dapat diberi nama gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disk untuk Cloud Composer 1 selalu memiliki jenis Standard persistent disk dan ukuran 2 GB.

Langkah selanjutnya