Menjalankan DAG Apache Airflow di Cloud Composer 1

Cloud Composer 1 | Cloud Composer 2

Halaman ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer.

Jika Anda baru mengenal Airflow, lihat tutorial ini untuk mengetahui informasi selengkapnya tentang konsep, objek, dan penggunaannya di Airflow.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Aktifkan API Cloud Composer.

    Mengaktifkan API

Membuat lingkungan

Konsol

  1. Di konsol Google Cloud, buka halaman Create environment.

    Buka Buat lingkungan

  2. Di kolom Name, masukkan example-environment.

  3. Di menu drop-down Lokasi, pilih region untuk lingkungan Cloud Composer. Lihat Wilayah yang tersedia untuk mengetahui informasi tentang cara memilih wilayah.

  4. Untuk opsi konfigurasi lingkungan lainnya, gunakan setelan default yang diberikan.

  5. Untuk membuat lingkungan, klik Buat.

  6. Tunggu hingga lingkungan dibuat. Setelah selesai, tanda centang hijau akan muncul di samping nama lingkungan.

gcloud

Tambahkan akun Agen Layanan Cloud Composer sebagai akun utama baru di akun layanan lingkungan Anda dan berikan peran Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) ke akun tersebut.

Secara default, lingkungan Anda menggunakan akun layanan Compute Engine default.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Buat lingkungan baru:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan. Panduan memulai ini menggunakan example-environment.
  • LOCATION dengan region untuk lingkungan Cloud Composer. Lihat Wilayah yang tersedia untuk mengetahui informasi tentang cara memilih wilayah.
  • IMAGE_VERSION dengan nama image Cloud Composer. Panduan ini menggunakan composer-1.20.12-airflow-1.10.15 untuk membuat lingkungan dengan image Cloud Composer terbaru.

Contoh:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

Untuk mengonfigurasi lingkungan ini menggunakan Terraform, tambahkan blok resource berikut ke konfigurasi Terraform Anda, lalu jalankan terraform apply.

Untuk memanfaatkan blok resource ini, akun layanan menggunakan Terraform harus memiliki peran dengan izin composer.environments.create yang diaktifkan. Untuk mengetahui informasi selengkapnya tentang akun layanan untuk Terraform, lihat Referensi Konfigurasi Penyedia Google.

Untuk mengetahui informasi lebih lanjut mengenai cara menggunakan Terraform untuk membuat lingkungan Cloud Composer, lihat dokumentasi Terraform.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME dengan nama lingkungan. Panduan memulai ini menggunakan example-environment.

  • LOCATION dengan region untuk lingkungan Cloud Composer. Lihat Wilayah yang tersedia untuk mengetahui informasi tentang cara memilih wilayah.

  • IMAGE_VERSION dengan nama image Cloud Composer. Panduan ini menggunakan composer-1.20.12-airflow-1.10.15 untuk membuat lingkungan dengan image Cloud Composer terbaru.

Contoh:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

Lihat detail lingkungan

Setelah pembuatan lingkungan selesai, Anda dapat melihat informasi lingkungan, seperti versi Cloud Composer, URL untuk antarmuka web Airflow, dan folder DAG di Cloud Storage.

Untuk melihat informasi lingkungan:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Untuk melihat halaman Environment details, klik nama lingkungan Anda, example-environment.

Membuat DAG

Airflow DAG adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Kode Python di quickstart.py:

  1. Membuat DAG, composer_sample_dag. DAG berjalan sekali per hari.
  2. Mengeksekusi satu tugas, print_dag_run_conf. Tugas ini mencetak konfigurasi run DAG menggunakan operator bash.

Untuk membuat DAG, buat salinan file quickstart.py di komputer lokal Anda.

Aliran udara 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Aliran udara 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Mengupload DAG ke Cloud Storage

Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags di bucket Cloud Storage lingkungan.

Untuk menjadwalkan DAG, upload quickstart.py dari mesin lokal Anda ke folder /dags lingkungan Anda.

Konsol

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Untuk membuka folder /dags, ikuti link folder DAG untuk example-environment.

  3. Di halaman detail Bucket, klik Upload files, lalu pilih salinan lokal quickstart.py Anda.

  4. Untuk mengupload file, klik Buka.

    Setelah Anda mengupload DAG, Cloud Composer akan menambahkan DAG ke Airflow dan segera menjadwalkan pengoperasian DAG. Mungkin perlu waktu beberapa menit hingga DAG muncul di antarmuka web Airflow.

gcloud

Untuk mengupload quickstart.py dengan gcloud, jalankan perintah berikut:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Melihat DAG di UI Airflow

Setiap lingkungan Cloud Composer memiliki server web yang menjalankan antarmuka web Airflow. Anda dapat mengelola DAG dari antarmuka web Airflow.

Untuk melihat DAG di antarmuka web Airflow:

Aliran udara 1

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Untuk membuka antarmuka web Airflow, klik link Airflow untuk example-environment. UI Airflow akan terbuka di jendela browser baru.

  3. Di toolbar Airflow, buka halaman DAGs.

  4. Untuk membuka halaman detail DAG, klik composer_sample_dag.

    Halaman DAG di UI Airflow
    Gambar 1. Halaman DAG di UI Airflow (klik untuk memperbesar)

    Halaman untuk DAG menampilkan Tree View, representasi grafis dari tugas dan dependensi alur kerja.

    Tampilan hierarki untuk DAG composer_sample_dags
    Gambar 2. Tampilan hierarki untuk DAG composer_sample_dags

Aliran udara 2

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Untuk membuka antarmuka web Airflow, klik link Airflow untuk example-environment. UI Airflow akan terbuka di jendela browser baru.

  3. Di toolbar Airflow, buka halaman DAGs.

  4. Untuk membuka halaman detail DAG, klik composer_sample_dag.

    Halaman DAG di UI Airflow
    Gambar 1. Halaman DAG di UI Airflow (klik untuk memperbesar)

    Halaman untuk DAG menampilkan Tree View, representasi grafis dari tugas dan dependensi alur kerja.

    Tampilan hierarki untuk DAG composer_sample_dags
    Gambar 2. Tampilan hierarki untuk DAG composer_sample_dags

Melihat detail instance tugas di log Airflow

DAG yang Anda jadwalkan mencakup tugas print_dag_run_conf. Tugas tersebut akan mencetak konfigurasi run DAG, yang dapat Anda lihat di log Airflow untuk instance tugas.

Untuk melihat detail instance tugas:

Aliran udara 1

  1. Dalam Tree View DAG di antarmuka web Airflow, klik Graph View.

    Jika Anda menahan pointer ke tugas print_dag_run_conf, statusnya akan ditampilkan.

    Tampilan hierarki untuk DAG composer_sample_dags
    Gambar 3. Status tugas print_dag_run_conf
  2. Klik tugas print_dag_run_conf.

    Di menu konteks Task Instance, Anda bisa mendapatkan metadata dan melakukan beberapa tindakan.

    Menu konteks Instance Tugas untuk tugas composer_sample_dags
    Gambar 4. Menu konteks Instance Tugas untuk tugas composer_sample_dags
  3. Di menu konteks Task Instance, klik View Log.

  4. Dalam Log, cari Running: ['bash' untuk melihat output dari operator bash.

    Output log operator Bash
    Gambar 5. Output log operator Bash

Aliran udara 2

  1. Dalam Tree View DAG di antarmuka web Airflow, klik Graph View.

    Jika Anda menahan pointer ke tugas print_dag_run_conf, statusnya akan ditampilkan.

    Tampilan hierarki untuk DAG composer_sample_dags
    Gambar 3. Status tugas print_dag_run_conf
  2. Klik tugas print_dag_run_conf.

    Di menu konteks Task Instance, Anda bisa mendapatkan metadata dan melakukan beberapa tindakan.

    Menu konteks Instance Tugas untuk tugas composer_sample_dags
    Gambar 4. Menu konteks Instance Tugas untuk tugas composer_sample_dags
  3. Di menu konteks Task Instance, klik Log.

  4. Dalam log, cari Running command: ['bash' untuk melihat output dari operator bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Pembersihan

Agar akun Google Cloud Anda tidak dikenakan biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer:

    1. Di konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Pilih example-environment, lalu klik Hapus.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus bucket-nya.

    1. Di konsol Google Cloud, buka halaman Storage > Browser.

      Buka Storage > Browser

    2. Pilih bucket lingkungan, lalu klik Hapus. Misalnya, bucket ini dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

  3. Hapus persistent disk dari antrean Redis lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus persistent disk-nya.

    1. Di Konsol Google Cloud, buka Compute Engine > Disks.

      Buka Disk

    2. Pilih persistent disk antrean Redis lingkungan, lalu klik Delete.

      Misalnya, disk ini dapat diberi nama gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disk untuk Cloud Composer 1 selalu memiliki jenis Standard persistent disk dan ukuran 2 GB.

Langkah selanjutnya