Utilizza CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina spiega come abilitare CeleryKubernetesExecutor in Cloud Composer e come utilizzare KubernetesExecutor nei DAG.

Informazioni su CeleryKubernetesExecutor

CeleryKubernetesExecutor è un tipo di esecutore che può utilizzare CeleryExecutor e KubernetesExecutor contemporaneamente nel tempo. Airflow seleziona l'esecutore in base alla coda che definisci per dell'attività. In un DAG, puoi eseguire alcune attività con CeleryExecutor e altre con KubernetesExecutor:

  • CeleryExecutor è ottimizzato per l'esecuzione rapida e scalabile dei compiti.
  • KubernetesExecutor è progettato per l'esecuzione di attività ad alta intensità di risorse delle attività in modo isolato.

CeleryKubernetesExecutor in Cloud Composer

CeleryKubernetesExecutor in Cloud Composer offre la possibilità e usare KubernetesExecutor per le tue attività. Non è possibile utilizzare KubernetesExecutor in Cloud Composer separatamente da CeleryKubernetesExecutor.

Cloud Composer esegue le attività che esegui con KubernetesExecutor nel cluster del tuo ambiente, nello stesso spazio dei nomi con i worker di Airflow. Tale hanno le stesse associazioni di Airflow e possono accedere alle risorse del tuo progetto.

Le attività che esegui con KubernetesExecutor utilizzano Modello di prezzi di Cloud Composer, poiché i pod con questi eseguite nel cluster del tuo ambiente. SKU di computing di Cloud Composer (per CPU, memoria e spazio di archiviazione) si applicano a questi pod.

Ti consigliamo di eseguire attività con CeleryExecutor quando:

  • Il tempo di avvio dell'attività è importante.
  • Le attività non richiedono l'isolamento del runtime e non consumano molte risorse.

Ti consigliamo di eseguire attività con KubernetesExecutor quando:

  • Le attività richiedono l'isolamento del runtime. Ad esempio, affinché le attività non competano per memoria e CPU, poiché vengono eseguite nei propri pod.
  • Le attività richiedono librerie di sistema aggiuntive (o pacchetti PyPI).
  • Le attività richiedono molte risorse e vuoi controllare le risorse di risorse di CPU e memoria.

KubernetesExecutor a confronto con KubernetesPodOperator

L'esecuzione di attività con KubernetesExecutor è simile eseguire attività con KubernetesPodOperator. Le attività vengono eseguite in i pod, fornendo così l'isolamento delle attività a livello di pod e una migliore gestione delle risorse.

Tuttavia, ci sono alcune differenze fondamentali:

  • KubernetesExecutor esegue attività solo in Cloud Composer con il controllo delle versioni del tuo ambiente. Non è possibile modificare questo spazio dei nomi in Cloud Composer. Puoi specificare uno spazio dei nomi in cui KubernetesPodOperator esegue le attività dei pod.
  • KubernetesExecutor può utilizzare qualsiasi operatore Airflow integrato. KubernetesPodOperator esegue solo uno script fornito definito dal del container.
  • KubernetesExecutor utilizza l'immagine Docker predefinita di Cloud Composer con lo stesso Python, gli override dell'opzione di configurazione Airflow, e pacchetti PyPI definiti nell'interfaccia nell'ambiente Cloud Composer.

Informazioni sulle immagini Docker

Per impostazione predefinita, KubernetesExecutor avvia le attività utilizzando la stessa immagine Docker che Cloud Composer utilizza i worker Celery. Questo è il immagine Cloud Composer per il tuo ambiente, con Tutte le modifiche specificate per l'ambiente, ad esempio PyPI personalizzato pacchetti o variabili di ambiente.

Prima di iniziare

  • Puoi utilizzare CeleryKubernetesExecutor in Cloud Composer 3.

  • Non è possibile utilizzare esecutori diversi da CeleryKubernetesExecutor in Cloud Composer 3. Ciò significa che puoi eseguire attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG, ma non possibile configurare il tuo ambiente in modo da usare solo KubernetesExecutor CeleryExecutor.

Configura CeleryKubernetesExecutor

Ti consigliamo di eseguire l'override della configurazione Airflow esistente relative a KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Questa opzione definisce il numero di chiamate di creazione di pod worker Kubernetes per loop dello scheduler. Il valore predefinito è 1, quindi viene avviato un solo pod per heartbeat dello scheduler. Se usi KubernetesExecutor in modo intensivo, consigliato per aumentare questo valore.

  • [kubernetes]worker_pods_pending_timeout

    Questa opzione definisce, in secondi, per quanto tempo un worker può rimanere nell'Pending (il pod è in fase di creazione) prima di essere considerato non riuscito. Il valore predefinito è 5 minuti.

Esegui attività con KubernetesExecutor o CeleryExecutor

Puoi eseguire le attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG:

  • Per eseguire un'attività con KubernetesExecutor, specifica il valore kubernetes nel Parametro queue di un'attività.
  • Per eseguire un'attività con CeleryExecutor, ometti il parametro queue.

Nell'esempio seguente viene eseguita l'attività task-kubernetes utilizzando KubernetesExecutor e l'attività task-celery che utilizza CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Esegui i comandi dell'interfaccia a riga di comando di Airflow relativi a KubernetesExecutor

Puoi eseguire diverse Comandi dell'interfaccia a riga di comando Airflow correlati a KubernetesExecutor utilizzando gcloud.

Personalizza le specifiche dei pod worker

Puoi personalizzare le specifiche del pod worker passandole nell'executor_config di un'attività. Puoi utilizzarlo per definire CPU e memoria personalizzate i tuoi requisiti.

Puoi eseguire l'override dell'intera specifica del pod worker utilizzata per eseguire un'attività. A la specifica del pod di un'attività usata da KubernetesExecutor, esegui l'interfaccia a riga di comando kubernetes generate-dag-yaml Airflow .

Per ulteriori informazioni sulla personalizzazione delle specifiche dei pod worker, consulta Documentazione di Airflow.

L'esempio seguente mostra un'attività che utilizza le specifiche dei pod di worker personalizzati:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Visualizza log delle attività

I log delle attività eseguite da KubernetesExecutor sono disponibili nella scheda Log, insieme ai log delle attività eseguite da CeleryExecutor:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.

  3. Vai alla scheda Log.

  4. Vai a Tutti i log > Log di Airflow > Lavoratori.

  5. I worker denominati airflow-k8s-worker vengono eseguiti attività di KubernetesExecutor. Per cercare i log di un'attività specifica, puoi Utilizzare un ID DAG o un ID attività come parola chiave nella ricerca.

Passaggi successivi