Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina spiega come abilitare CeleryKubernetesExecutor in Cloud Composer e come utilizzare KubernetesExecutor nei DAG.
Informazioni su CeleryKubernetesExecutor
CeleryKubernetesExecutor è un tipo di esecutore che può utilizzare CeleryExecutor e KubernetesExecutor contemporaneamente nel tempo. Airflow seleziona l'esecutore in base alla coda che definisci per dell'attività. In un DAG, puoi eseguire alcune attività con CeleryExecutor e altre con KubernetesExecutor:
- CeleryExecutor è ottimizzato per l'esecuzione rapida e scalabile dei compiti.
- KubernetesExecutor è progettato per l'esecuzione di attività ad alta intensità di risorse delle attività in modo isolato.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer offre la possibilità e usare KubernetesExecutor per le tue attività. Non è possibile utilizzare KubernetesExecutor in Cloud Composer separatamente da CeleryKubernetesExecutor.
Cloud Composer esegue le attività che esegui con KubernetesExecutor nel cluster del tuo ambiente, nello stesso spazio dei nomi con i worker di Airflow. Tale hanno le stesse associazioni di Airflow e possono accedere alle risorse del tuo progetto.
Le attività che esegui con KubernetesExecutor utilizzano Modello di prezzi di Cloud Composer, poiché i pod con questi eseguite nel cluster del tuo ambiente. SKU di computing di Cloud Composer (per CPU, memoria e spazio di archiviazione) si applicano a questi pod.
Ti consigliamo di eseguire attività con CeleryExecutor quando:
- Il tempo di avvio dell'attività è importante.
- Le attività non richiedono l'isolamento del runtime e non consumano molte risorse.
Ti consigliamo di eseguire attività con KubernetesExecutor quando:
- Le attività richiedono l'isolamento del runtime. Ad esempio, affinché le attività non competano per memoria e CPU, poiché vengono eseguite nei propri pod.
- Le attività richiedono librerie di sistema aggiuntive (o pacchetti PyPI).
- Le attività richiedono molte risorse e vuoi controllare le risorse di risorse di CPU e memoria.
KubernetesExecutor a confronto con KubernetesPodOperator
L'esecuzione di attività con KubernetesExecutor è simile eseguire attività con KubernetesPodOperator. Le attività vengono eseguite in i pod, fornendo così l'isolamento delle attività a livello di pod e una migliore gestione delle risorse.
Tuttavia, ci sono alcune differenze fondamentali:
- KubernetesExecutor esegue attività solo in Cloud Composer con il controllo delle versioni del tuo ambiente. Non è possibile modificare questo spazio dei nomi in Cloud Composer. Puoi specificare uno spazio dei nomi in cui KubernetesPodOperator esegue le attività dei pod.
- KubernetesExecutor può utilizzare qualsiasi operatore Airflow integrato. KubernetesPodOperator esegue solo uno script fornito definito dal del container.
- KubernetesExecutor utilizza l'immagine Docker predefinita di Cloud Composer con lo stesso Python, gli override dell'opzione di configurazione Airflow, e pacchetti PyPI definiti nell'interfaccia nell'ambiente Cloud Composer.
Informazioni sulle immagini Docker
Per impostazione predefinita, KubernetesExecutor avvia le attività utilizzando la stessa immagine Docker che Cloud Composer utilizza i worker Celery. Questo è il immagine Cloud Composer per il tuo ambiente, con Tutte le modifiche specificate per l'ambiente, ad esempio PyPI personalizzato pacchetti o variabili di ambiente.
Prima di iniziare
Puoi utilizzare CeleryKubernetesExecutor in Cloud Composer 3.
Non è possibile utilizzare esecutori diversi da CeleryKubernetesExecutor in Cloud Composer 3. Ciò significa che puoi eseguire attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG, ma non possibile configurare il tuo ambiente in modo da usare solo KubernetesExecutor CeleryExecutor.
Configura CeleryKubernetesExecutor
Ti consigliamo di eseguire l'override della configurazione Airflow esistente relative a KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Questa opzione definisce il numero di chiamate di creazione di pod worker Kubernetes per loop dello scheduler. Il valore predefinito è
1
, quindi viene avviato un solo pod per heartbeat dello scheduler. Se usi KubernetesExecutor in modo intensivo, consigliato per aumentare questo valore.[kubernetes]worker_pods_pending_timeout
Questa opzione definisce, in secondi, per quanto tempo un worker può rimanere nell'
Pending
(il pod è in fase di creazione) prima di essere considerato non riuscito. Il valore predefinito è 5 minuti.
Esegui attività con KubernetesExecutor o CeleryExecutor
Puoi eseguire le attività utilizzando CeleryExecutor, KubernetesExecutor o entrambi in un DAG:
- Per eseguire un'attività con KubernetesExecutor, specifica il valore
kubernetes
nel Parametroqueue
di un'attività. - Per eseguire un'attività con CeleryExecutor, ometti il parametro
queue
.
Nell'esempio seguente viene eseguita l'attività task-kubernetes
utilizzando
KubernetesExecutor e l'attività task-celery
che utilizza CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Esegui i comandi dell'interfaccia a riga di comando di Airflow relativi a KubernetesExecutor
Puoi eseguire diverse
Comandi dell'interfaccia a riga di comando Airflow correlati a KubernetesExecutor
utilizzando gcloud
.
Personalizza le specifiche dei pod worker
Puoi personalizzare le specifiche del pod worker passandole nell'executor_config
di un'attività. Puoi utilizzarlo per definire CPU e memoria personalizzate
i tuoi requisiti.
Puoi eseguire l'override dell'intera specifica del pod worker utilizzata per eseguire un'attività. A
la specifica del pod di un'attività usata da KubernetesExecutor,
esegui l'interfaccia a riga di comando kubernetes generate-dag-yaml
Airflow
.
Per ulteriori informazioni sulla personalizzazione delle specifiche dei pod worker, consulta Documentazione di Airflow.
L'esempio seguente mostra un'attività che utilizza le specifiche dei pod di worker personalizzati:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Visualizza log delle attività
I log delle attività eseguite da KubernetesExecutor sono disponibili nella scheda Log, insieme ai log delle attività eseguite da CeleryExecutor:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Log.
Vai a Tutti i log > Log di Airflow > Lavoratori.
I worker denominati
airflow-k8s-worker
vengono eseguiti attività di KubernetesExecutor. Per cercare i log di un'attività specifica, puoi Utilizzare un ID DAG o un ID attività come parola chiave nella ricerca.
Passaggi successivi
- Risoluzione dei problemi di KubernetesExecutor
- Utilizzo di KubernetesPodOperator
- Utilizzo degli operatori GKE
- Ignorare le opzioni di configurazione di Airflow