Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menjelaskan cara mengaktifkan CeleryKubernetesExecutor di Cloud Composer dan cara menggunakan KubernetesExecutor di DAG Anda.
Tentang CeleryKubernetesExecutor
CeleryKubernetesExecutor adalah jenis eksekutor yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan. Airflow memilih eksekutor berdasarkan antrean yang Anda tentukan untuk tugas. Dalam satu DAG, Anda dapat menjalankan beberapa tugas dengan CeleryExecutor, dan tugas lainnya dengan KubernetesExecutor:
- CeleryExecutor dioptimalkan untuk eksekusi tugas yang cepat dan skalabel.
- KubernetesExecutor dirancang untuk menjalankan tugas yang membutuhkan banyak resource dan menjalankan tugas secara terpisah.
CeleryKubernetesExecutor di Cloud Composer
CeleryKubernetesExecutor di Cloud Composer memberikan kemampuan untuk menggunakan KubernetesExecutor untuk tugas Anda. Anda tidak dapat menggunakan KubernetesExecutor di Cloud Composer secara terpisah dari CeleryKubernetesExecutor.
Cloud Composer menjalankan tugas yang Anda jalankan dengan KubernetesExecutor di cluster lingkungan, dalam namespace yang sama dengan pekerja Airflow. Tugas tersebut memiliki binding yang sama dengan pekerja Airflow dan dapat mengakses resource dalam project Anda.
Tugas yang Anda jalankan dengan KubernetesExecutor menggunakan model penetapan harga Cloud Composer, karena pod dengan tugas ini berjalan di cluster lingkungan Anda. SKU Compute Cloud Composer (untuk CPU, Memori, dan Penyimpanan) berlaku untuk pod ini.
Sebaiknya jalankan tugas dengan CeleryExecutor saat:
- Waktu mulai tugas sangat penting.
- Tugas tidak memerlukan isolasi runtime dan tidak memerlukan banyak resource.
Sebaiknya jalankan tugas dengan KubernetesExecutor jika:
- Tugas memerlukan isolasi runtime. Misalnya, agar tugas tidak bersaing untuk memori dan CPU, karena tugas tersebut berjalan di pod-nya sendiri.
- Tugas memerlukan library sistem tambahan (atau paket PyPI).
- Tugas membutuhkan banyak resource dan Anda ingin mengontrol resource CPU dan memori yang tersedia.
KubernetesExecutor dibandingkan dengan KubernetesPodOperator
Menjalankan tugas dengan KubernetesExecutor mirip dengan menjalankan tugas menggunakan KubernetesPodOperator. Tugas dieksekusi di pod, sehingga memberikan isolasi tugas tingkat pod dan pengelolaan resource yang lebih baik.
Namun, ada beberapa perbedaan utama:
- KubernetesExecutor hanya menjalankan tugas di namespace Cloud Composer berversi lingkungan Anda. Anda tidak dapat mengubah namespace ini di Cloud Composer. Anda dapat menentukan namespace tempat KubernetesPodOperator menjalankan tugas pod.
- KubernetesExecutor dapat menggunakan operator Airflow bawaan. KubernetesPodOperator hanya menjalankan skrip yang disediakan dan ditentukan oleh titik entri penampung.
- KubernetesExecutor menggunakan image Docker Cloud Composer default dengan Python yang sama, penggantian opsi konfigurasi Airflow, variabel lingkungan, dan paket PyPI yang ditentukan di lingkungan Cloud Composer Anda.
Tentang image Docker
Secara default, KubernetesExecutor meluncurkan tugas menggunakan image Docker yang sama dengan yang digunakan Cloud Composer untuk pekerja Celery. Ini adalah image Cloud Composer untuk lingkungan Anda, dengan semua perubahan yang Anda tentukan untuk lingkungan Anda, seperti paket PyPI kustom atau variabel lingkungan.
Sebelum memulai
Anda dapat menggunakan CeleryKubernetesExecutor di Cloud Composer 3.
Anda tidak dapat menggunakan eksekutor selain CeleryKubernetesExecutor di Cloud Composer 3. Artinya, Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG, tetapi Anda tidak dapat mengonfigurasi lingkungan untuk hanya menggunakan KubernetesExecutor atau CeleryExecutor.
Mengonfigurasi CeleryKubernetesExecutor
Sebaiknya ganti opsi konfigurasi Airflow yang ada dan terkait dengan KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Opsi ini menentukan jumlah panggilan pembuatan Pod Pekerja Kubernetes per loop penjadwal. Nilai defaultnya adalah
1
, sehingga hanya satu pod yang diluncurkan per heartbeat penjadwal. Jika Anda menggunakan KubernetesExecutor secara intensif, sebaiknya naikkan nilai ini.[kubernetes]worker_pods_pending_timeout
Opsi ini menentukan, dalam detik, berapa lama pekerja dapat berada dalam status
Pending
(Pod sedang dibuat) sebelum dianggap gagal. Nilai default-nya adalah 5 menit.
Menjalankan tugas dengan KubernetesExecutor atau CeleryExecutor
Anda dapat menjalankan tugas menggunakan CeleryExecutor, KubernetesExecutor, atau keduanya dalam satu DAG:
- Untuk menjalankan tugas dengan KubernetesExecutor, tentukan nilai
kubernetes
dalam parameterqueue
tugas. - Untuk menjalankan tugas dengan CeleryExecutor, hapus parameter
queue
.
Contoh berikut menjalankan tugas task-kubernetes
menggunakan
KubernetesExecutor dan tugas task-celery
menggunakan CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Menjalankan perintah CLI Airflow yang terkait dengan KubernetesExecutor
Anda dapat menjalankan beberapa
perintah CLI Airflow yang terkait dengan KubernetesExecutor
menggunakan gcloud
.
Menyesuaikan spesifikasi pod pekerja
Anda dapat menyesuaikan spesifikasi pod pekerja dengan meneruskannya dalam parameter executor_config
tugas. Anda dapat menggunakannya untuk menentukan persyaratan CPU dan memori
kustom.
Anda dapat mengganti seluruh spesifikasi pod pekerja yang digunakan untuk menjalankan tugas. Untuk
mengambil spesifikasi pod tugas yang digunakan oleh KubernetesExecutor, Anda dapat
menjalankan perintah CLI Airflow kubernetes generate-dag-yaml
.
Untuk informasi selengkapnya tentang cara menyesuaikan spesifikasi pod pekerja, lihat dokumentasi Airflow.
Contoh berikut menunjukkan tugas yang menggunakan spesifikasi pod pekerja kustom:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Melihat log tugas
Log tugas yang dijalankan oleh KubernetesExecutor tersedia di tab Logs, bersama dengan log tugas yang dijalankan oleh CeleryExecutor:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs.
Buka Semua log > Log Airflow > Pekerja.
Pekerja bernama
airflow-k8s-worker
menjalankan tugas KubernetesExecutor. Untuk mencari log tugas tertentu, Anda dapat menggunakan ID DAG atau ID tugas sebagai kata kunci dalam penelusuran.
Langkah selanjutnya
- Memecahkan masalah KubernetesExecutor
- Menggunakan KubernetesPodOperator
- Menggunakan operator GKE
- Mengganti opsi konfigurasi Airflow