Utilizzare CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina spiega come attivare CeleryKubernetesExecutor in Cloud Composer e come utilizzare KubernetesExecutor nei DAG.

Informazioni su CeleryKubernetesExecutor

CeleryKubernetesExecutor è un tipo di Executor che può utilizzare contemporaneamente CeleryExecutor e KubernetesExecutor. Airflow seleziona l'executor in base alla coda che definisci per l'attività. In un DAG, puoi eseguire alcune attività con CeleryExecutor e altre con KubernetesExecutor:

  • CeleryExecutor è ottimizzato per l'esecuzione rapida e scalabile delle attività.
  • KubernetesExecutor è progettato per l'esecuzione di attività che richiedono molte risorse e per eseguire attività in isolamento.

CeleryKubernetesExecutor in Cloud Composer

CeleryKubernetesExecutor in Cloud Composer ti consente di utilizzare KubernetesExecutor per le tue attività. Non è possibile utilizzare KubernetesExecutor in Cloud Composer separatamente da CeleryKubernetesExecutor.

Cloud Composer esegue le attività che esegui con KubernetesExecutor nel cluster del tuo ambiente, nello stesso spazio dei nomi dei worker Airflow. Queste attività hanno le stesse associazioni dei worker Airflow e possono accedere alle risorse del progetto.

Le attività che esegui con KubernetesExecutor utilizzano il modello di prezzi di Cloud Composer, poiché i pod con queste attività vengono eseguiti nel cluster del tuo ambiente. A questi pod si applicano gli SKU di Cloud Composer Compute (per CPU, memoria e spazio di archiviazione).

Ti consigliamo di eseguire le attività con CeleryExecutor quando:

  • Il momento di inizio dell'attività è importante.
  • Le attività non richiedono l'isolamento in fase di esecuzione e non richiedono molte risorse.

Consigliamo di eseguire attività con KubernetesExecutor quando:

  • Le attività richiedono l'isolamento del runtime. Ad esempio, in modo che le attività non competano per la memoria e la CPU, poiché vengono eseguite nei propri pod.
  • Le attività richiedono molte risorse e vuoi controllare le risorse di CPU e memoria disponibili.

KubernetesExecutor rispetto a KubernetesPodOperator

L'esecuzione di attività con KubernetesExecutor è simile all'esecuzione di attività utilizzando KubernetesPodOperator. Le attività vengono eseguite in pod, garantendo così l'isolamento delle attività a livello di pod e una migliore gestione delle risorse.

Tuttavia, ci sono alcune differenze fondamentali:

  • KubernetesExecutor esegue le attività solo nello spazio dei nomi Cloud Composer con versione del tuo ambiente. Non è possibile modificare questo spazio dei nomi in Cloud Composer. Puoi specificare uno spazio dei nomi in cui KubernetesPodOperator esegue le attività dei pod.
  • KubernetesExecutor può utilizzare qualsiasi operatore Airflow integrato. KubernetesPodOperator esegue solo uno script fornito definito dall'entrypoint del container.
  • KubernetesExecutor utilizza l'immagine Docker Cloud Composer predefinita con gli stessi override delle opzioni di configurazione di Python e Airflow, le stesse variabili di ambiente e gli stessi pacchetti PyPI definiti nel tuo ambiente Cloud Composer.

Informazioni sulle immagini Docker

Per impostazione predefinita, KubernetesExecutor avvia le attività utilizzando la stessa immagine Docker utilizzata da Cloud Composer per i worker Celery. Si tratta dell'immagine Cloud Composer per il tuo ambiente, con tutte le modifiche che hai specificato per l'ambiente, ad esempio pacchetti PyPI personalizzati o variabili di ambiente.

Prima di iniziare

  • Puoi utilizzare CeleryKubernetesExecutor in Cloud Composer 3.

  • In Cloud Composer 3 non è possibile utilizzare un altro executor diverso da CeleryKubernetesExecutor. Ciò significa che puoi eseguire attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG, ma non è possibile configurare l'ambiente in modo da utilizzare solo KubernetesExecutor o CeleryExecutor.

Configura CeleryKubernetesExecutor

Ti consigliamo di sostituire le opzioni di configurazione di Airflow esistenti relative a KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Questa opzione definisce il numero di chiamate di creazione di pod di lavoro Kubernetes per loop di pianificatore. Il valore predefinito è 1, pertanto viene avviato un solo pod per heartbeat dello scheduler. Se utilizzi KubernetesExecutor in modo intensivo, ti consigliamo di aumentare questo valore.

  • [kubernetes]worker_pods_pending_timeout

    Questa opzione definisce, in secondi, il tempo per cui un worker può rimanere nello stato Pending (il pod è in fase di creazione) prima di essere considerato non riuscito. Il valore predefinito è 5 minuti.

Esegui attività con KubernetesExecutor o CeleryExecutor

Puoi eseguire attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un unico DAG:

  • Per eseguire un'attività con KubernetesExecutor, specifica il valore kubernetes nel parametro queue di un'attività.
  • Per eseguire un'attività con CeleryExecutor, ometti il parametro queue.

L'esempio seguente esegue il compito task-kubernetes utilizzando KubernetesExecutor e il compito task-celery utilizzando CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Esegui i comandi dell'interfaccia a riga di comando Airflow relativi a KubernetesExecutor

Puoi eseguire diversi comandi CLI di Airflow relativi a KubernetesExecutor utilizzando gcloud.

Personalizzare la specifica del pod worker

Puoi personalizzare la specifica del pod di lavoro passandola nel parametro executor_config di un'attività. Puoi utilizzarlo per definire requisiti personalizzati per CPU e memoria.

Puoi eseguire l'override dell'intera specifica del pod di worker utilizzata per eseguire un'attività. Per recuperare la specifica del pod di un'attività utilizzata da KubernetesExecutor, puoi eseguire il comando kubernetes generate-dag-yaml della CLI di Airflow.

Per ulteriori informazioni sulla personalizzazione della specifica del pod di lavoro, consulta la documentazione di Airflow.

Cloud Composer 3 supporta i seguenti valori per i requisiti delle risorse:

Risorsa Minimo Massimo Passaggio
CPU 0,25 32 Valori del passo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. I valori richiesti vengono arrotondati per eccesso al valore del passaggio supportato più vicino (ad es. da 5 a 6).
Memoria 2G (GB) 128G (GB) Valori del passo: 2, 3, 4, 5, ..., 128. I valori richiesti vengono arrotondati per eccesso al valore del passaggio supportato più vicino (ad esempio, da 3,5 G a 4G).
Archiviazione - 100G (GB) Qualsiasi valore. Se vengono richiesti più di 100 GB, vengono forniti solo 100 GB.

Per ulteriori informazioni sulle unità di risorse in Kubernetes, consulta Unità di risorse in Kubernetes.

L'esempio seguente mostra un'attività che utilizza la specifica del pod di worker personalizzato:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Visualizza i log delle attività

I log delle attività eseguite da KubernetesExecutor sono disponibili nella scheda Log, insieme ai log delle attività eseguite da CeleryExecutor:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Vai alla scheda Log.

  4. Vai a Tutti i log > Log di Airflow > Worker.

  5. I worker denominati airflow-k8s-worker eseguono le attività KubernetesExecutor. Per cercare i log di un'attività specifica, puoi utilizzare un ID DAG o un ID attività come parola chiave nella ricerca.

Passaggi successivi