Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menjelaskan cara mengaktifkan CeleryKubernetesExecutor di Cloud Composer dan cara menggunakan KubernetesExecutor di DAG Anda.
Tentang CeleryKubernetesExecutor
CeleryKubernetesExecutor adalah jenis eksekutor yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan baik. Airflow memilih eksekutor berdasarkan antrean yang Anda tentukan untuk tugas Anda. Dalam satu DAG, Anda dapat menjalankan beberapa tugas dengan CeleryExecutor, dan tugas lainnya dengan KubernetesExecutor:
- CeleryExecutor dioptimalkan untuk eksekusi tugas yang cepat dan skalabel.
- KubernetesExecutor dirancang untuk menjalankan tugas yang menggunakan banyak resource dan menjalankan tugas secara terpisah.
CeleryKubernetesExecutor di Cloud Composer
CeleryKubernetesExecutor di Cloud Composer memberikan kemampuan untuk menggunakan KubernetesExecutor untuk tugas Anda. Tidak mungkin menggunakan KubernetesExecutor di Cloud Composer secara terpisah dari CeleryKubernetesExecutor.
Cloud Composer menjalankan tugas yang Anda jalankan dengan KubernetesExecutor di cluster lingkungan Anda, di namespace yang sama dengan worker Airflow. Seperti tugas memiliki binding yang sama dengan Airflow pekerja dan dapat mengakses sumber daya dalam proyek Anda.
Tugas yang Anda jalankan dengan KubernetesExecutor akan menggunakan Model penetapan harga Cloud Composer, karena pod dengan tugas yang berjalan di cluster lingkungan Anda. SKU Cloud Composer Compute (untuk CPU, Memori, dan Penyimpanan) berlaku untuk pod ini.
Sebaiknya jalankan tugas dengan CeleryExecutor saat:
- Waktu mulai tugas itu penting.
- Tugas tidak memerlukan isolasi runtime dan tidak memerlukan banyak resource.
Sebaiknya jalankan tugas dengan KubernetesExecutor saat:
- Tugas memerlukan isolasi runtime. Misalnya, agar tugas tidak bersaing untuk memori dan CPU, karena keduanya berjalan di pod mereka sendiri.
- Tugas memerlukan library sistem tambahan (atau paket PyPI).
- Tugas-tugas memerlukan banyak resource dan Anda ingin mengontrol resource yang tersedia resource CPU dan memori.
KubernetesExecutor dibandingkan dengan KubernetesPodOperator
Menjalankan tugas dengan KubernetesExecutor mirip dengan menjalankan tugas menggunakan KubernetesPodOperator. Tugas dijalankan dalam pod, sehingga menyediakan isolasi tugas tingkat pod dan pengelolaan resource yang lebih baik.
Namun, ada beberapa perbedaan utama:
- KubernetesExecutor hanya menjalankan tugas di Cloud Composer berversi namespace. Namespace ini tidak dapat diubah di Cloud Composer. Anda dapat menentukan namespace tempat KubernetesPodOperator menjalankan tugas pod.
- KubernetesExecutor dapat menggunakan operator Airflow bawaan. KubernetesPodOperator hanya mengeksekusi skrip yang disediakan yang ditentukan oleh titik entri container.
- KubernetesExecutor menggunakan image Docker Cloud Composer default dengan Python yang sama, opsi konfigurasi Airflow menggantikan, variabel, dan paket PyPI yang ditentukan dalam lingkungan Cloud Composer.
Tentang image Docker
Secara default, KubernetesExecutor meluncurkan tugas menggunakan image Docker yang sama dengan Cloud Composer untuk pekerja Celery. Ini adalah Image Cloud Composer untuk lingkungan Anda, dengan semua perubahan yang Anda tentukan untuk lingkungan, seperti PyPI kustom paket atau variabel lingkungan.
Sebelum memulai
Anda dapat menggunakan CeleryKubernetesExecutor di Cloud Composer 3.
Tidak mungkin menggunakan eksekutor selain CeleryKubernetesExecutor di Cloud Composer 3. Ini berarti Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor atau keduanya dalam satu DAG, tetapi tidak dapat mengonfigurasi lingkungan Anda agar hanya menggunakan KubernetesExecutor atau Eksekutor Seledri.
Mengonfigurasi CeleryKubernetesExecutor
Anda mungkin ingin mengganti konfigurasi Airflow yang ada opsi yang terkait dengan KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Opsi ini menentukan jumlah panggilan pembuatan Pod Pekerja Kubernetes per {i>scheduler<i}. Nilai defaultnya adalah
1
, sehingga hanya satu pod yang diluncurkan setiap detak jantung penjadwal. Jika Anda sering menggunakan KubernetesExecutor, disarankan untuk meningkatkan nilai ini.[kubernetes]worker_pods_pending_timeout
Opsi ini menentukan, dalam detik, berapa lama pekerja dapat tetap berada di
Pending
(Pod sedang dibuat) sebelum dianggap gagal. Default nilainya adalah 5 menit.
Menjalankan tugas dengan KubernetesExecutor atau CeleryExecutor
Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG:
- Untuk menjalankan tugas dengan KubernetesExecutor, tentukan nilai
kubernetes
di Parameterqueue
tugas. - Untuk menjalankan tugas dengan CeleryExecutor, hapus parameter
queue
.
Contoh berikut menjalankan tugas task-kubernetes
menggunakan
KubernetesExecutor dan tugas task-celery
menggunakan CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Menjalankan perintah CLI Airflow yang terkait dengan KubernetesExecutor
Anda dapat menjalankan beberapa
Perintah CLI Airflow yang terkait dengan KubernetesExecutor
menggunakan gcloud
.
Menyesuaikan spesifikasi pod pekerja
Anda dapat menyesuaikan spesifikasi pod pekerja dengan meneruskannya dalam executor_config
parameter tugas. Anda dapat menggunakannya untuk menentukan memori dan CPU kustom
lainnya.
Anda dapat mengganti seluruh spesifikasi pod pekerja yang digunakan untuk menjalankan tugas. Kepada
mengambil spesifikasi pod tugas yang digunakan oleh KubernetesExecutor, Anda dapat
jalankan kubernetes generate-dag-yaml
Airflow CLI
perintah.
Untuk informasi selengkapnya tentang menyesuaikan spesifikasi pod pekerja, lihat Dokumentasi Airflow.
Contoh berikut menunjukkan tugas yang menggunakan spesifikasi pod pekerja kustom:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Lihat log tugas
Log tugas yang dijalankan oleh KubernetesExecutor tersedia di tab Logs, bersama dengan log tugas yang dijalankan oleh CeleryExecutor:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Logs.
Buka All logs > Airflow logs > Pekerja.
Pekerja bernama
airflow-k8s-worker
mengeksekusi Tugas KubernetesExecutor. Untuk mencari log tentang tugas tertentu, Anda bisa gunakan ID DAG atau ID tugas sebagai kata kunci dalam penelusuran.
Langkah selanjutnya
- Memecahkan Masalah KubernetesExecutor
- Menggunakan KubernetesPodOperator
- Menggunakan operator GKE
- Mengganti opsi konfigurasi Airflow