Menggunakan CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara mengaktifkan CeleryKubernetesExecutor di Cloud Composer dan cara menggunakan KubernetesExecutor di DAG Anda.

Tentang CeleryKubernetesExecutor

CeleryKubernetesExecutor adalah jenis eksekutor yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan. Airflow memilih eksekutor berdasarkan antrean yang Anda tentukan untuk tugas. Dalam satu DAG, Anda dapat menjalankan beberapa tugas dengan CeleryExecutor, dan tugas lainnya dengan KubernetesExecutor:

  • CeleryExecutor dioptimalkan untuk eksekusi tugas yang cepat dan skalabel.
  • KubernetesExecutor dirancang untuk menjalankan tugas yang membutuhkan banyak resource dan menjalankan tugas secara terpisah.

CeleryKubernetesExecutor di Cloud Composer

CeleryKubernetesExecutor di Cloud Composer memberikan kemampuan untuk menggunakan KubernetesExecutor untuk tugas Anda. Anda tidak dapat menggunakan KubernetesExecutor di Cloud Composer secara terpisah dari CeleryKubernetesExecutor.

Cloud Composer menjalankan tugas yang Anda jalankan dengan KubernetesExecutor di cluster lingkungan, dalam namespace yang sama dengan pekerja Airflow. Tugas tersebut memiliki binding yang sama dengan pekerja Airflow dan dapat mengakses resource dalam project Anda.

Tugas yang Anda jalankan dengan KubernetesExecutor menggunakan model penetapan harga Cloud Composer, karena pod dengan tugas ini berjalan di cluster lingkungan Anda. SKU Compute Cloud Composer (untuk CPU, Memori, dan Penyimpanan) berlaku untuk pod ini.

Sebaiknya jalankan tugas dengan CeleryExecutor saat:

  • Waktu mulai tugas sangat penting.
  • Tugas tidak memerlukan isolasi runtime dan tidak memerlukan banyak resource.

Sebaiknya jalankan tugas dengan KubernetesExecutor jika:

  • Tugas memerlukan isolasi runtime. Misalnya, agar tugas tidak bersaing untuk memori dan CPU, karena tugas tersebut berjalan di pod-nya sendiri.
  • Tugas memerlukan library sistem tambahan (atau paket PyPI).
  • Tugas membutuhkan banyak resource dan Anda ingin mengontrol resource CPU dan memori yang tersedia.

KubernetesExecutor dibandingkan dengan KubernetesPodOperator

Menjalankan tugas dengan KubernetesExecutor mirip dengan menjalankan tugas menggunakan KubernetesPodOperator. Tugas dieksekusi di pod, sehingga memberikan isolasi tugas tingkat pod dan pengelolaan resource yang lebih baik.

Namun, ada beberapa perbedaan utama:

  • KubernetesExecutor hanya menjalankan tugas di namespace Cloud Composer berversi lingkungan Anda. Anda tidak dapat mengubah namespace ini di Cloud Composer. Anda dapat menentukan namespace tempat KubernetesPodOperator menjalankan tugas pod.
  • KubernetesExecutor dapat menggunakan operator Airflow bawaan. KubernetesPodOperator hanya menjalankan skrip yang disediakan dan ditentukan oleh titik entri penampung.
  • KubernetesExecutor menggunakan image Docker Cloud Composer default dengan Python yang sama, penggantian opsi konfigurasi Airflow, variabel lingkungan, dan paket PyPI yang ditentukan di lingkungan Cloud Composer Anda.

Tentang image Docker

Secara default, KubernetesExecutor meluncurkan tugas menggunakan image Docker yang sama dengan yang digunakan Cloud Composer untuk pekerja Celery. Ini adalah image Cloud Composer untuk lingkungan Anda, dengan semua perubahan yang Anda tentukan untuk lingkungan Anda, seperti paket PyPI kustom atau variabel lingkungan.

Sebelum memulai

  • Anda dapat menggunakan CeleryKubernetesExecutor di Cloud Composer 3.

  • Anda tidak dapat menggunakan eksekutor selain CeleryKubernetesExecutor di Cloud Composer 3. Artinya, Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG, tetapi Anda tidak dapat mengonfigurasi lingkungan untuk hanya menggunakan KubernetesExecutor atau CeleryExecutor.

Mengonfigurasi CeleryKubernetesExecutor

Sebaiknya ganti opsi konfigurasi Airflow yang ada dan terkait dengan KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Opsi ini menentukan jumlah panggilan pembuatan Pod Pekerja Kubernetes per loop penjadwal. Nilai defaultnya adalah 1, sehingga hanya satu pod yang diluncurkan per heartbeat penjadwal. Jika Anda menggunakan KubernetesExecutor secara intensif, sebaiknya naikkan nilai ini.

  • [kubernetes]worker_pods_pending_timeout

    Opsi ini menentukan, dalam detik, berapa lama pekerja dapat berada dalam status Pending (Pod sedang dibuat) sebelum dianggap gagal. Nilai default-nya adalah 5 menit.

Menjalankan tugas dengan KubernetesExecutor atau CeleryExecutor

Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG:

  • Untuk menjalankan tugas dengan KubernetesExecutor, tentukan nilai kubernetes dalam parameter queue tugas.
  • Untuk menjalankan tugas dengan CeleryExecutor, hapus parameter queue.

Contoh berikut menjalankan tugas task-kubernetes menggunakan KubernetesExecutor dan tugas task-celery menggunakan CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Menjalankan perintah CLI Airflow yang terkait dengan KubernetesExecutor

Anda dapat menjalankan beberapa perintah CLI Airflow yang terkait dengan KubernetesExecutor menggunakan gcloud.

Menyesuaikan spesifikasi pod pekerja

Anda dapat menyesuaikan spesifikasi pod pekerja dengan meneruskannya dalam parameter executor_config tugas. Anda dapat menggunakannya untuk menentukan persyaratan CPU dan memori kustom.

Anda dapat mengganti seluruh spesifikasi pod pekerja yang digunakan untuk menjalankan tugas. Untuk mengambil spesifikasi pod tugas yang digunakan oleh KubernetesExecutor, Anda dapat menjalankan perintah CLI Airflow kubernetes generate-dag-yaml.

Untuk informasi selengkapnya tentang cara menyesuaikan spesifikasi pod pekerja, lihat dokumentasi Airflow.

Contoh berikut menunjukkan tugas yang menggunakan spesifikasi pod pekerja kustom:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Melihat log tugas

Log tugas yang dijalankan oleh KubernetesExecutor tersedia di tab Logs, bersama dengan log tugas yang dijalankan oleh CeleryExecutor:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Buka tab Logs.

  4. Buka Semua log > Log Airflow > Pekerja.

  5. Pekerja bernama airflow-k8s-worker menjalankan tugas KubernetesExecutor. Untuk mencari log tugas tertentu, Anda dapat menggunakan ID DAG atau ID tugas sebagai kata kunci dalam penelusuran.

Langkah selanjutnya