Menggunakan CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara mengaktifkan CeleryKubernetesExecutor di Cloud Composer dan cara menggunakan KubernetesExecutor di DAG Anda.

Tentang CeleryKubernetesExecutor

CeleryKubernetesExecutor adalah jenis eksekutor yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan baik. Airflow memilih eksekutor berdasarkan antrean yang Anda tentukan untuk tugas Anda. Dalam satu DAG, Anda dapat menjalankan beberapa tugas dengan CeleryExecutor, dan tugas lainnya dengan KubernetesExecutor:

  • CeleryExecutor dioptimalkan untuk eksekusi tugas yang cepat dan skalabel.
  • KubernetesExecutor dirancang untuk menjalankan tugas yang menggunakan banyak resource dan menjalankan tugas secara terpisah.

CeleryKubernetesExecutor di Cloud Composer

CeleryKubernetesExecutor di Cloud Composer memberikan kemampuan untuk menggunakan KubernetesExecutor untuk tugas Anda. Tidak mungkin menggunakan KubernetesExecutor di Cloud Composer secara terpisah dari CeleryKubernetesExecutor.

Cloud Composer menjalankan tugas yang Anda jalankan dengan KubernetesExecutor di cluster lingkungan Anda, di namespace yang sama dengan worker Airflow. Seperti tugas memiliki binding yang sama dengan Airflow pekerja dan dapat mengakses sumber daya dalam proyek Anda.

Tugas yang Anda jalankan dengan KubernetesExecutor akan menggunakan Model penetapan harga Cloud Composer, karena pod dengan tugas yang berjalan di cluster lingkungan Anda. SKU Cloud Composer Compute (untuk CPU, Memori, dan Penyimpanan) berlaku untuk pod ini.

Sebaiknya jalankan tugas dengan CeleryExecutor saat:

  • Waktu mulai tugas itu penting.
  • Tugas tidak memerlukan isolasi runtime dan tidak memerlukan banyak resource.

Sebaiknya jalankan tugas dengan KubernetesExecutor saat:

  • Tugas memerlukan isolasi runtime. Misalnya, agar tugas tidak bersaing untuk memori dan CPU, karena keduanya berjalan di pod mereka sendiri.
  • Tugas memerlukan library sistem tambahan (atau paket PyPI).
  • Tugas-tugas memerlukan banyak resource dan Anda ingin mengontrol resource yang tersedia resource CPU dan memori.

KubernetesExecutor dibandingkan dengan KubernetesPodOperator

Menjalankan tugas dengan KubernetesExecutor mirip dengan menjalankan tugas menggunakan KubernetesPodOperator. Tugas dijalankan dalam pod, sehingga menyediakan isolasi tugas tingkat pod dan pengelolaan resource yang lebih baik.

Namun, ada beberapa perbedaan utama:

  • KubernetesExecutor hanya menjalankan tugas di Cloud Composer berversi namespace. Namespace ini tidak dapat diubah di Cloud Composer. Anda dapat menentukan namespace tempat KubernetesPodOperator menjalankan tugas pod.
  • KubernetesExecutor dapat menggunakan operator Airflow bawaan. KubernetesPodOperator hanya mengeksekusi skrip yang disediakan yang ditentukan oleh titik entri container.
  • KubernetesExecutor menggunakan image Docker Cloud Composer default dengan Python yang sama, opsi konfigurasi Airflow menggantikan, variabel, dan paket PyPI yang ditentukan dalam lingkungan Cloud Composer.

Tentang image Docker

Secara default, KubernetesExecutor meluncurkan tugas menggunakan image Docker yang sama dengan Cloud Composer untuk pekerja Celery. Ini adalah Image Cloud Composer untuk lingkungan Anda, dengan semua perubahan yang Anda tentukan untuk lingkungan, seperti PyPI kustom paket atau variabel lingkungan.

Sebelum memulai

  • Anda dapat menggunakan CeleryKubernetesExecutor di Cloud Composer 3.

  • Tidak mungkin menggunakan eksekutor selain CeleryKubernetesExecutor di Cloud Composer 3. Ini berarti Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor atau keduanya dalam satu DAG, tetapi tidak dapat mengonfigurasi lingkungan Anda agar hanya menggunakan KubernetesExecutor atau Eksekutor Seledri.

Mengonfigurasi CeleryKubernetesExecutor

Anda mungkin ingin mengganti konfigurasi Airflow yang ada opsi yang terkait dengan KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Opsi ini menentukan jumlah panggilan pembuatan Pod Pekerja Kubernetes per {i>scheduler<i}. Nilai defaultnya adalah 1, sehingga hanya satu pod yang diluncurkan setiap detak jantung penjadwal. Jika Anda sering menggunakan KubernetesExecutor, disarankan untuk meningkatkan nilai ini.

  • [kubernetes]worker_pods_pending_timeout

    Opsi ini menentukan, dalam detik, berapa lama pekerja dapat tetap berada di Pending (Pod sedang dibuat) sebelum dianggap gagal. Default nilainya adalah 5 menit.

Menjalankan tugas dengan KubernetesExecutor atau CeleryExecutor

Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG:

  • Untuk menjalankan tugas dengan KubernetesExecutor, tentukan nilai kubernetes di Parameter queue tugas.
  • Untuk menjalankan tugas dengan CeleryExecutor, hapus parameter queue.

Contoh berikut menjalankan tugas task-kubernetes menggunakan KubernetesExecutor dan tugas task-celery menggunakan CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Menjalankan perintah CLI Airflow yang terkait dengan KubernetesExecutor

Anda dapat menjalankan beberapa Perintah CLI Airflow yang terkait dengan KubernetesExecutor menggunakan gcloud.

Menyesuaikan spesifikasi pod pekerja

Anda dapat menyesuaikan spesifikasi pod pekerja dengan meneruskannya dalam executor_config parameter tugas. Anda dapat menggunakannya untuk menentukan memori dan CPU kustom lainnya.

Anda dapat mengganti seluruh spesifikasi pod pekerja yang digunakan untuk menjalankan tugas. Kepada mengambil spesifikasi pod tugas yang digunakan oleh KubernetesExecutor, Anda dapat jalankan kubernetes generate-dag-yaml Airflow CLI perintah.

Untuk informasi selengkapnya tentang menyesuaikan spesifikasi pod pekerja, lihat Dokumentasi Airflow.

Contoh berikut menunjukkan tugas yang menggunakan spesifikasi pod pekerja kustom:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Lihat log tugas

Log tugas yang dijalankan oleh KubernetesExecutor tersedia di tab Logs, bersama dengan log tugas yang dijalankan oleh CeleryExecutor:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Logs.

  4. Buka All logs &gt; Airflow logs &gt; Pekerja.

  5. Pekerja bernama airflow-k8s-worker mengeksekusi Tugas KubernetesExecutor. Untuk mencari log tentang tugas tertentu, Anda bisa gunakan ID DAG atau ID tugas sebagai kata kunci dalam penelusuran.

Langkah selanjutnya