Cloud Composer 1 | Cloud Composer 2
Questa pagina descrive come utilizzare KubernetesPodOperator
per eseguire il deployment dei pod Kubernetes da Cloud Composer nel cluster Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer e per garantire che l'ambiente disponga delle risorse appropriate.
KubernetesPodOperator
avvia i pod Kubernetes
nel cluster del tuo ambiente. In confronto, gli operatori di Google Kubernetes Engine eseguono i pod Kubernetes in un cluster specificato, che può essere un cluster separato non correlato al tuo ambiente. Puoi anche creare ed eliminare
cluster utilizzando gli operatori di Google Kubernetes Engine.
KubernetesPodOperator
è una buona opzione se hai bisogno di:
- Dipendenze Python personalizzate che non sono disponibili tramite il repository PyPI pubblico.
- Dipendenze binarie non disponibili nell'immagine worker di Cloud Composer.
Questa pagina ti guida in un esempio di DAG Airflow che include le seguenti configurazioni di KubernetesPodOperator
:
- Configurazione minima: imposta solo i parametri richiesti.
- Configurazione modello: utilizza i parametri che puoi modellare con Jinja.
- Configurazione delle variabili secret: trasmette un oggetto Secret di Kubernetes al pod.
- La configurazione dell'affinità pod non è disponibile in Cloud Composer 2. Utilizza invece gli operatori GKE per avviare i pod in un cluster diverso.
Configurazione completa: include tutte le configurazioni.
Prima di iniziare
In Cloud Composer 2, il cluster del tuo ambiente scala automaticamente. I carichi di lavoro aggiuntivi eseguiti utilizzando
KubernetesPodOperator
scalano in modo indipendente dal tuo ambiente. Il tuo ambiente non è interessato dall'aumento della domanda di risorse, ma il suo cluster fa lo scale up e lo scale down a seconda della domanda di risorse. I prezzi per i carichi di lavoro aggiuntivi che esegui nel cluster del tuo ambiente seguono il modello di prezzi di Cloud Composer 2 e utilizzano gli SKU di Cloud Composer.I cluster Cloud Composer 2 utilizzano Workload Identity. Per impostazione predefinita, i pod eseguiti in uno spazio dei nomi appena creato o nello spazio dei nomi
composer-user-workloads
non possono accedere alle risorse di Google Cloud. Quando utilizzi Workload Identity, gli account di servizio Kubernetes associati agli spazi dei nomi devono essere mappati agli account di servizio Google Cloud per abilitare l'autorizzazione dell'identità del servizio per le richieste alle API di Google e ad altri servizi.Per questo motivo, se esegui pod nello spazio dei nomi
composer-user-workloads
o in uno spazio dei nomi appena creato nel cluster del tuo ambiente, le associazioni IAM corrette tra gli account di servizio Kubernetes e Google Cloud non vengono create e questi pod non possono accedere alle risorse del tuo progetto Google Cloud.Se vuoi che i tuoi pod abbiano accesso alle risorse Google Cloud, utilizza lo spazio dei nomi
composer-user-workloads
o crea il tuo spazio dei nomi come descritto ulteriormente.Per concedere l'accesso alle risorse del tuo progetto, segui le indicazioni in Workload Identity e configura le associazioni:
- Crea uno spazio dei nomi separato nel cluster del tuo ambiente.
- Crea un'associazione tra l'account di servizio Kubernetes
composer-user-workloads/<namespace_name>
e l'account di servizio del tuo ambiente. - Aggiungi l'annotazione dell'account di servizio del tuo ambiente all'account di servizio Kubernetes.
- Quando utilizzi
KubernetesPodOperator
, specifica lo spazio dei nomi e l'account di servizio Kubernetes nei parametrinamespace
eservice_account_name
.
Se utilizzi la versione 5.0.0 del provider CNCF Kubernetes, segui le istruzioni documentate nella sezione sui provider di Kubernetes del CNCF.
Cloud Composer 2 utilizza i cluster GKE con Workload Identity. Il server di metadati GKE richiede alcuni secondi per iniziare ad accettare richieste su un pod appena creato. Di conseguenza, i tentativi di autenticazione tramite Workload Identity entro i primi secondi di vita di un pod potrebbero non riuscire. Puoi scoprire di più su questa limitazione qui.
Cloud Composer 2 utilizza cluster Autopilot che introducono la nozione di classi di calcolo. Per impostazione predefinita, se non è selezionata alcuna classe, viene utilizzata la classe
general-purpose
quando crei pod utilizzandoKubernetesPodOperator
.- Ogni classe è associata a proprietà e limiti delle risorse specifici. Scopri di più nella documentazione di Autopilot. Ad esempio, i pod eseguiti
all'interno della classe
general-purpose
possono utilizzare fino a 110 GiB di memoria.
- Ogni classe è associata a proprietà e limiti delle risorse specifici. Scopri di più nella documentazione di Autopilot. Ad esempio, i pod eseguiti
all'interno della classe
Configurazione di KubernetesPodOperator
Per continuare con questo esempio, inserisci l'intero file kubernetes_pod_operator.py
nella cartella dags/
del tuo ambiente o aggiungi il codice KubernetesPodOperator
pertinente a un DAG.
Le sezioni seguenti spiegano ogni configurazione KubernetesPodOperator
nell'esempio. Per informazioni su ogni variabile di configurazione, consulta la documentazione di riferimento di Airflow.
Configurazione minima
Per creare un KubernetesPodOperator
, sono richiesti solo i campi name
, namespace
del pod in cui eseguire
il pod, image
da utilizzare e task_id
.
Quando inserisci il seguente snippet di codice in un DAG, la configurazione utilizza i valori predefiniti in /home/airflow/composer_kube_config
. Non è necessario modificare il codice per completare l'attività pod-ex-minimum
.
Configurazione modello
Airflow supporta l'utilizzo di Jinja Templating.
Devi dichiarare le variabili richieste (task_id
, name
, namespace
e image
) con l'operatore. Come mostrato nell'esempio seguente, puoi
modellare tutti gli altri parametri con Jinja, tra cui cmds
, arguments
,
env_vars
e config_file
.
Se non modifichi il DAG o l'ambiente, l'attività ex-kube-templates
non va a buon fine a causa di due errori. I log mostrano che questa attività non è riuscita perché la variabile appropriata non esiste (my_value
). Il secondo errore, ottenuto dopo la correzione del primo errore, indica che l'attività non riesce perché core/kube_config
non è stato trovato in config
.
Per correggere entrambi gli errori, segui la procedura descritta in dettaglio.
Per impostare my_value
con gcloud
o la UI di Airflow:
UI di Airflow
Nell'interfaccia utente di Airflow 2:
Vai all'UI di Airflow.
Nella barra degli strumenti, seleziona Amministrazione > Variabili.
Nella pagina Variabile elenco, fai clic su Aggiungi un nuovo record.
Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:
- Chiave:
my_value
- Valore:
example_value
- Chiave:
Fai clic su Salva.
gcloud
Per Airflow 2, inserisci il seguente comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Sostituisci:
ENVIRONMENT
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.
Per fare riferimento a un elemento config_file
personalizzato (un file di configurazione di Kubernetes), esegui l'override dell'opzione di configurazione di Airflow da kube_config
a una configurazione Kubernetes valida:
Sezione | Chiave | Valore |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Attendi qualche minuto per l'aggiornamento dell'ambiente. Poi esegui di nuovo l'attività ex-kube-templates
e verifica che l'attività ex-kube-templates
sia riuscita.
Configurazione delle variabili secret
Un secret di Kubernetes
è un oggetto che contiene dati sensibili. Puoi passare i secret ai pod Kubernetes utilizzando KubernetesPodOperator
.
I secret devono essere definiti in Kubernetes, altrimenti il pod non si avvia.
Questo esempio mostra due modi di utilizzare i secret di Kubernetes: come variabile di ambiente e come volume montato dal pod.
Il primo secret, airflow-secrets
, è impostato su una variabile di ambiente Kubernetes denominata SQL_CONN
(e non su una variabile di ambiente Airflow o Cloud Composer).
Il secondo secret, service-account
, monta service-account.json
, un file con un token dell'account di servizio, in /var/secrets/google
.
Ecco come si presentano i segreti:
Il nome del primo secret di Kubernetes è definito nella variabile secret
.
Questo secret in particolare è denominato airflow-secrets
. È esposto come variabile di ambiente, come indicato da deploy_type
. La variabile di ambiente
che imposta, deploy_target
, è SQL_CONN
. Infine, il key
del secret archiviato nel deploy_target
è sql_alchemy_conn
.
Il nome del secondo secret di Kubernetes è definito nella variabile secret
.
Questo secret in particolare è denominato service-account
. Viene esposto come volume, come previsto da deploy_type
. Il percorso del file da montare, deploy_target
, è /var/secrets/google
. Infine, il key
del secret
archiviato nella deploy_target
è service-account.json
.
Ecco come si presenta la configurazione dell'operatore:
Se non vengono apportate modifiche al DAG o al tuo ambiente, l'attività ex-kube-secrets
non va a buon fine. Se esamini i log, l'attività non riesce a causa di un errore Pod took too long to start
. Questo errore si verifica perché Airflow
non riesce a trovare il secret specificato nella configurazione secret_env
.
gcloud
Per impostare il secret utilizzando gcloud
:
Recupera informazioni sul tuo cluster di ambiente Cloud Composer.
Esegui questo comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Sostituisci:
ENVIRONMENT
con il nome del tuo ambiente.LOCATION
con la regione in cui si trova l'ambiente Cloud Composer.
L'output di questo comando utilizza il seguente formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Per ottenere l'ID cluster GKE, copia l'output dopo
/clusters/
(termina con-gke
).
Esegui la connessione al tuo cluster GKE eseguendo questo comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Sostituisci:
CLUSTER_ID
con l'ID del tuo cluster GKE.PROJECT
con l'ID del tuo progetto Google Cloud.LOCATION
con la regione in cui si trova l'ambiente Cloud Composer.
Creare secret Kubernetes.
Crea un secret Kubernetes che imposti il valore di
sql_alchemy_conn
sutest_value
eseguendo questo comando:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Crea un secret Kubernetes che imposti il valore di
service-account.json
su un percorso locale di un file di chiavi dell'account di servizio chiamatokey.json
eseguendo questo comando:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Dopo aver impostato i secret, esegui di nuovo l'attività
ex-kube-secrets
nell'interfaccia utente di Airflow.Verifica che l'attività
ex-kube-secrets
sia riuscita.
Configurazione completa
Questo esempio mostra tutte le variabili che puoi configurare in KubernetesPodOperator
. Non è necessario modificare il codice per completare l'attività ex-all-configs
.
Per i dettagli su ogni variabile, consulta il
riferimento Airflow KubernetesPodOperator
.
Informazioni sul provider Kubernetes CNCF
GKEStartPodOperator e KubernetesPodOperator sono implementati all'interno del provider apache-airflow-providers-cncf-kubernetes
.
Per le note di rilascio non riuscite per il provider Kubernetes del CNCF, fai riferimento al sito web del provider Kubernetes di CNCF.
Versione 6.0.0
Nella versione 6.0.0 del pacchetto provider Kubernetes del CNCF,
la connessione kubernetes_default
viene utilizzata per impostazione predefinita in
KubernetesPodOperator
.
Se hai specificato una connessione personalizzata nella versione 5.0.0, questa connessione personalizzata viene ancora utilizzata dall'operatore. Per tornare a utilizzare la connessione kubernetes_default
, potresti voler modificare i DAG di conseguenza.
Versione 5.0.0
Questa versione introduce alcune modifiche incompatibili
con le versioni precedenti rispetto alla versione 4.4.0. Quelli più importanti, di cui dovresti essere a conoscenza, sono relativi alla connessione kubernetes_default
, che non viene utilizzata nella versione 5.0.0.
- La connessione
kubernetes_default
deve essere modificata: il percorso di configurazione kube deve essere impostato su/home/airflow/composer_kube_config
(vedi Figura 1) o, in alternativa,config_file
deve essere aggiunto alla configurazioneKubernetesPodOperator
(come presentato di seguito).
- Modifica il codice di un'attività utilizzando KubernetesPodOperator nel seguente modo
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Per maggiori informazioni sulla versione 5.0.0, consulta le Note di rilascio del provider CNCF Kubernetes
Risoluzione dei problemi
Suggerimenti per la risoluzione degli errori dei pod
Oltre a controllare i log delle attività nella UI di Airflow, controlla anche i seguenti log:
Output dello scheduler e dei worker di Airflow:
Nella console Google Cloud, vai alla pagina Ambienti.
Segui il link dei DAG per il tuo ambiente.
Sali di un livello nel bucket del tuo ambiente.
Esamina i log nella cartella
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Log dettagliati dei pod nella console Google Cloud all'interno dei carichi di lavoro GKE. Questi log includono il file YAML di definizione dei pod, gli eventi dei pod e i dettagli dei pod.
Codici di reso diversi da zero quando utilizzi anche GKEStartPodOperator
Quando utilizzi KubernetesPodOperator
e GKEStartPodOperator
, il codice restituito del punto di ingresso del container determina se l'attività viene considerata riuscita o meno. I codici di ritorno diversi da zero indicano un errore.
Un pattern comune quando si utilizzano KubernetesPodOperator
e GKEStartPodOperator
è l'esecuzione di uno script shell come punto di ingresso del container per raggruppare più operazioni all'interno del container.
Se stai scrivendo uno script di questo tipo, ti consigliamo di includere
il comando set -e
nella parte superiore dello script
in modo che i comandi non riusciti nello script terminino lo script e
propaghino l'errore all'istanza dell'attività Airflow.
Timeout dei pod
Il timeout predefinito per KubernetesPodOperator
è 120 secondi, che può comportare timeout prima del download di immagini più grandi. Puoi
aumentare il timeout modificando il
parametro startup_timeout_seconds
quando crei KubernetesPodOperator
.
Quando si verifica il timeout di un pod, il log specifico dell'attività è disponibile nella UI di Airflow. Ad esempio:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
I timeout dei pod possono verificarsi anche quando l'account di servizio Cloud Composer non dispone delle autorizzazioni IAM necessarie per eseguire l'attività. Per verificarlo, esamina gli errori a livello di pod utilizzando le dashboard di GKE per esaminare i log per il tuo carico di lavoro specifico, oppure utilizza Cloud Logging.
Impossibile stabilire una nuova connessione
L'upgrade automatico è abilitato per impostazione predefinita nei cluster GKE. Se un pool di nodi si trova in un cluster in fase di upgrade, potresti visualizzare il seguente errore:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Per verificare se è in corso l'upgrade del cluster, nella console Google Cloud vai alla pagina Cluster Kubernetes e cerca l'icona di caricamento accanto al nome del cluster del tuo ambiente.