Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina spiega come abilitare CeleryKubernetesExecutor in Cloud Composer e come utilizzare KubernetesExecutor nei DAG.
Informazioni su CeleryKubernetesExecutor
CeleryKubernetesExecutor è un tipo di esecutore che può utilizzare CeleryExecutor e KubernetesExecutor contemporaneamente. Airflow seleziona l'esecutore in base alla coda che definisci per l'attività. In un DAG, puoi eseguire alcune attività con CeleryExecutor e altre attività con KubernetesExecutor:
- CeleryExecutor è ottimizzato per l'esecuzione rapida e scalabile dei compiti.
- KubernetesExecutor è progettato per l'esecuzione di attività ad alta intensità di risorse e per l'esecuzione di attività in modo isolato.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer offre la possibilità di utilizzare KubernetesExecutor per le attività. Non è possibile utilizzare KubernetesExecutor in Cloud Composer separatamente da CeleryKubernetesExecutor.
Cloud Composer esegue le attività che esegui con KubernetesExecutor nel cluster del tuo ambiente, nello stesso spazio dei nomi con i worker di Airflow. Queste attività hanno le stesse associazioni dei worker Airflow e possono accedere alle risorse nel tuo progetto.
Le attività eseguite con KubernetesExecutor utilizzano il modello di prezzi di Cloud Composer, poiché i pod con queste attività vengono eseguiti nel cluster del tuo ambiente. A questi pod si applicano gli SKU di computing di Cloud Composer (per CPU, memoria e spazio di archiviazione).
Ti consigliamo di eseguire attività con CeleryExecutor quando:
- Il tempo di avvio dell'attività è importante.
- Le attività non richiedono l'isolamento del runtime e non consumano molte risorse.
Ti consigliamo di eseguire attività con KubernetesExecutor quando:
- Le attività richiedono l'isolamento del runtime. ad esempio in modo che le attività non competano per memoria e CPU, poiché vengono eseguite nei propri pod.
- Le attività richiedono librerie di sistema aggiuntive (o pacchetti PyPI).
- Le attività richiedono molte risorse e vuoi controllare le risorse di CPU e memoria disponibili.
KubernetesExecutor a confronto con KubernetesPodOperator
L'esecuzione di attività con KubernetesExecutor è simile all'esecuzione di attività con KubernetesPodOperator. Le attività vengono eseguite nei pod, fornendo così l'isolamento a livello di pod e una migliore gestione delle risorse.
Tuttavia, ci sono alcune differenze fondamentali:
- KubernetesExecutor esegue attività solo nello spazio dei nomi Cloud Composer con le versioni del tuo ambiente. Non è possibile modificare questo spazio dei nomi in Cloud Composer. Puoi specificare uno spazio dei nomi in cui KubernetesPodOperator esegue le attività dei pod.
- KubernetesExecutor può utilizzare qualsiasi operatore Airflow integrato. KubernetesPodOperator esegue solo uno script fornito definito dal punto di ingresso del container.
- KubernetesExecutor utilizza l'immagine Docker Cloud Composer predefinita con gli stessi override delle opzioni di configurazione Python, Airflow, variabili di ambiente e pacchetti PyPI definiti nel tuo ambiente Cloud Composer.
Informazioni sulle immagini Docker
Per impostazione predefinita, KubernetesExecutor avvia le attività utilizzando la stessa immagine Docker utilizzata da Cloud Composer per i worker Celery. Questa è l'immagine Cloud Composer per il tuo ambiente, con tutte le modifiche specificate per l'ambiente, ad esempio i pacchetti PyPI personalizzati o le variabili di ambiente.
Prima di iniziare
Puoi utilizzare CeleryKubernetesExecutor in Cloud Composer 3.
Non è possibile utilizzare esecutori diversi da CeleryKubernetesExecutor in Cloud Composer 3. Ciò significa che puoi eseguire attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG, ma non è possibile configurare il tuo ambiente per utilizzare solo KubernetesExecutor o CeleryExecutor.
Configura CeleryKubernetesExecutor
Potrebbe essere utile eseguire l'override delle opzioni di configurazione Airflow esistenti correlate a KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Questa opzione definisce il numero di chiamate di creazione di pod worker Kubernetes per loop dello scheduler. Il valore predefinito è
1
, quindi viene avviato un solo pod per heartbeat dello scheduler. Se usi KubernetesExecutor in modo massiccio, consigliamo di aumentare questo valore.[kubernetes]worker_pods_pending_timeout
Questa opzione definisce, in secondi, per quanto tempo un worker può rimanere nello stato
Pending
(è in corso la creazione del pod) prima che venga considerato non riuscito. Il valore predefinito è 5 minuti.
Esegui attività con KubernetesExecutor o CeleryExecutor
Puoi eseguire le attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG:
- Per eseguire un'attività con KubernetesExecutor, specifica il valore
kubernetes
nel parametroqueue
di un'attività. - Per eseguire un'attività con CeleryExecutor, ometti il parametro
queue
.
Nell'esempio seguente viene eseguita l'attività task-kubernetes
utilizzando
KubernetesExecutor e l'attività task-celery
utilizzando CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Esegui i comandi dell'interfaccia a riga di comando di Airflow relativi a KubernetesExecutor
Puoi eseguire diversi
comandi dell'interfaccia a riga di comando Airflow relativi a KubernetesExecutor
utilizzando gcloud
.
Personalizza le specifiche dei pod worker
Puoi personalizzare le specifiche del pod worker passandole nel parametro executor_config
di un'attività. Puoi usarlo per definire requisiti di CPU e memoria
personalizzati.
Puoi eseguire l'override dell'intera specifica del pod worker utilizzata per eseguire un'attività. Per recuperare la specifica del pod di un'attività utilizzata da KubernetesExecutor, puoi eseguire il comando kubernetes generate-dag-yaml
dell'interfaccia a riga di comando di Airflow.
Per ulteriori informazioni sulla personalizzazione delle specifiche dei pod worker, consulta la documentazione di Airflow.
L'esempio seguente mostra un'attività che utilizza le specifiche dei pod di worker personalizzati:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Visualizza log delle attività
I log delle attività eseguite da KubernetesExecutor sono disponibili nella scheda Log, insieme a quelli delle attività eseguite da CeleryExecutor:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Log.
Vai a Tutti i log > Log di Airflow > Worker.
I worker denominati
airflow-k8s-worker
eseguono le attività KubernetesExecutor. Per cercare i log di un'attività specifica, puoi utilizzare un ID DAG o un ID attività come parola chiave nella ricerca.
Passaggi successivi
- Risoluzione dei problemi di KubernetesExecutor
- Utilizzo di KubernetesPodOperator
- Utilizzo degli operatori GKE
- Ignorare le opzioni di configurazione di Airflow