Cette page explique comment utiliser KubernetesPodOperator
pour lancer les pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine appartenant à votre environnement Cloud Composer et comment vous assurer que votre environnement dispose des ressources appropriées.
L'utilisation de KubernetesPodOperator
est recommandée si vous avez besoin :
- de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
- de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.
Cette page présente un exemple de DAG incluant les configurations KubernetesPodOperator
suivantes :
- Configuration minimale : définit seulement les paramètres obligatoires.
- Configuration du modèle : utilise des paramètres que vous pouvez modéliser avec Jinja.
- Configuration des variables 'secret' : transmet un objet secret Kubernetes au pod.
- Configuration de l'affinité des pods : limite les nœuds disponibles sur lesquels les pods sont programmés.
- Configuration complète : inclut toutes les configurations.
Avant de commencer
- Nous vous recommandons d'utiliser la dernière version de Cloud Composer. La version utilisée doit être au minimum incluse dans la stratégie d'obsolescence et de compatibilité de Cloud Composer. Pour vérifier la version de l'image, consultez les détails de l'environnement.
- Assurez-vous que votre environnement dispose de suffisamment de ressources. Le lancement de pods dans un environnement où les ressources sont insuffisantes peut entraîner des erreurs au niveau des nœuds de calcul et des programmeurs Airflow.
Garantir des ressources adaptées à votre environnement
Lorsque vous créez un environnement Cloud Composer, vous spécifiez la puissance de calcul pour l'environnement, et une certaine portion de cette puissance est allouée au cluster GKE. Le lancement de pods Kubernetes dans l'environnement peut entraîner une concurrence entre les programmes pour accéder à certaines ressources, comme le processeur ou la mémoire. Dans la mesure où le programmeur et les nœuds de calcul Airflow appartiennent au même cluster GKE, ils ne fonctionneront pas correctement si cette concurrence entraîne un épuisement des ressources.
Pour éviter une pénurie de ressources, prenez l'une ou plusieurs des mesures suivantes :
- Créez un pool de nœuds (option à privilégier).
- Augmentez le nombre de nœuds dans votre environnement.
- Spécifiez le type de machine approprié.
Créer un pool de nœuds
La méthode privilégiée pour éviter l'épuisement des ressources dans l'environnement Cloud Composer est de créer un pool de nœuds et de configurer les pods Kubernetes pour qu'ils s'exécutent en exploitant exclusivement les ressources de ce pool.
Pour créer un pool de nœuds dans un cluster existant, procédez comme suit :
Console
Dans Cloud Console, accédez au menu de GKE.
Sélectionnez le cluster souhaité.
Cliquez sur Modifier.
Dans Pools de nœuds, cliquez sur Ajouter un pool de nœuds.
Configurez votre pool de nœuds.
(Facultatif) Activez les options avancées, telles que les mises à jour automatiques et l'autoscaling.
Cliquez sur Save.
gcloud
Saisissez la commande suivante :
gcloud container node-pools create POOL_NAME \ --cluster CLUSTER_NAME \ --project PROJECT_ID \ --zone ZONE \ ADDITIONAL_FLAGS
Où :
POOL_NAME
correspond au nom souhaité du pool de nœuds.CLUSTER
correspond au nom du cluster dans lequel vous souhaitez créer le pool de nœuds.PROJECT_ID
correspond au nom du projet Cloud Composer.ZONE
correspond à la zone où se trouve le cluster GKE.Pour obtenir la liste des options, consultez la documentation sur la commande
gcloud container node-pools create
.Si l'opération aboutit, la requête
node-pools create
renvoie les informations relatives au pool de nœuds :Creating node pool example-pool...done. Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool]. NAME MACHINE_TYPE DISK_SIZE_GB NODE_VERSION example-pool n1-standard-1 100 1.2.4
Augmenter le nombre de nœuds dans votre environnement
L'augmentation du nombre de nœuds dans votre environnement Cloud Composer augmente la puissance de calcul disponible pour vos nœuds de calcul. Cette augmentation ne fournit pas de ressources supplémentaires pour les tâches qui nécessitent plus de processeur ou de RAM que le type de machine spécifié.
Pour augmenter le nombre de nœuds, mettez à jour votre environnement.
Spécifier le type de machine approprié
Lors de la création de l'environnement Cloud Composer, vous pouvez spécifier un type de machine. Pour garantir la disponibilité des ressources, spécifiez le type de machine idéal pour le type de calcul qui se produit dans votre environnement Cloud Composer.
Configuration de KubernetesPodOperator
Pour suivre cet exemple, placez l'intégralité du fichier kubernetes_pod_operator.py
dans le dossier/DAG de votre environnement ou ajoutez le code KubernetesPodOperator
pertinent à un DAG.
Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator
de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.
Configuration minimale
Pour créer un opérateur KubernetesPodOperator, seules les variables name
, namespace
, image
et task_id
sont obligatoires.
Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config
. Il n'est pas nécessaire de modifier le code pour que la tâche pod-ex-minimum
réussisse.
Configuration du modèle
Airflow est compatible avec la modélisation Jinja.
Vous devez déclarer les variables requises (task_id
, name
, namespace
, image
) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds
, arguments
, env_vars
et config_file
.
Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates
échoue en raison de deux erreurs. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.
- Vérifiez que la tâche
ex-kube-templates
échoue. - Vérifiez les journaux de la tâche
ex-kube-templates
.Les journaux indiquent que cette tâche échoue car la variable appropriée n'existe pas (
my_value
). Pour définir
my_value
avecgcloud
ou l'interface utilisateur d'Airflow, procédez comme suit :gcloud
Saisissez la commande suivante :
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set my_value example_value
Où :
ENVIRONMENT
correspond au nom de l'environnement Cloud Composer.LOCATION
correspond à la région où se trouve l'environnement Cloud Composer
Interface utilisateur d'Airflow
- Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).
- Cliquez sur Create (Créer).
- Saisissez les informations suivantes :
- Key (Clé) :
my_value
- Val (Valeur) :
example_value
- Key (Clé) :
- Cliquez sur Enregistrer.
Vérifiez l'état de la tâche
ex-kube-templates
.La tâche
ex-kube-templates
continue d'échouer. Si vous consultez les journaux, la tâche échoue désormais carcore/kubeconfig
est introuvable dansconfig
. Pour faire référence à unconfig_file
personnalisé (fichier de configuration Kubernetes), vous devez définir la variablekube_config
du fichierairflow.cfg
sur une configuration Kubernetes valide.Pour définir la variable
kube_config
, entrez la commande suivante :gcloud composer environments update ENVIRONMENT \ --location LOCATION \ --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config
Où :
ENVIRONMENT
correspond au nom de l'environnement Cloud Composer.LOCATION
correspond à la région où se trouve l'environnement Cloud Composer.
Patientez quelques minutes pendant la mise à jour de votre environnement.
Exécutez à nouveau la tâche
ex-kube-templates
.
Configuration des variables "secret"
Un secret Kubernetes est un objet qui contient une petite quantité de données sensibles.
Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator
.
Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.
Dans cet exemple, nous déployons le secret Kubernetes airflow-secrets
, dans une variable d'environnement Kubernetes nommée SQL_CONN
(par opposition à une variable d'environnement Airflow ou Cloud Composer).
Voici à quoi ressemble ce secret :
Le nom du secret Kubernetes est défini dans la variable secret
.
Ce secret particulier est nommé airflow-secrets
. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type
. La variable d'environnement sur laquelle il est défini, deploy_target
, est SQL_CONN
. Enfin, la clé (key
) du secret que nous stockons dans deploy_target
est sql_alchemy_conn
.
Voici à quoi ressemble la configuration de l'opérateur :
Sans modification du DAG ou de votre environnement, la tâche ex-kube-secrets
échoue. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.
- Vérifiez que la tâche
ex-kube-secrets
échoue. - Vérifiez les journaux de la tâche
ex-kube-secrets
.En consultant les journaux, vous pourrez constater que la tâche échoue en raison d'une erreur
Pod took too long to start
. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration,secret_env
. Pour définir le secret à l'aide de
gcloud
, procédez comme suit :Pour afficher les détails de votre environnement Cloud Composer, exécutez la commande suivante :
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
où :
ENVIRONMENT
correspond au nom de l'environnement Cloud Composer.LOCATION
correspond à la région où se trouve l'environnement Cloud ComposerLa sortie doit être semblable à ceci :
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
Pour obtenir l'ID du cluster GKE, copiez et collez la sortie après
/clusters/
(se termine par-gke
) dans un endroit où vous pourrez la récupérer plus tard. Cette sortie correspond à votre identifiant de cluster.Pour obtenir la zone, copiez et collez la sortie après
/zones/
dans un endroit où vous pourrez la récupérer plus tard.Connectez-vous à votre cluster GKE en exécutant la commande suivante :
gcloud container clusters get-credentials CLUSTER_ID \ --zone ZONE \ --project PROJECT
Où :
CLUSTER_ID
correspond à votre ID de cluster GKE.ZONE
correspond à la zone où se trouve votre GKE.PROJECT
est l'ID de votre projet Google Cloud.
Créez un secret Kubernetes qui définit la valeur de
sql_alchemy_conn
surtest_value
en exécutant la commande suivante :kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Après avoir défini le secret, exécutez à nouveau la tâche
ex-kube-secrets
.
Configuration de l'affinité du pod
Lorsque vous configurez le paramètre affinity
dans KubernetesPodOperator
, vous contrôlez les nœuds sur lesquels les pods sont programmés, par exemple les nœuds d'un pool de nœuds spécifique.
Dans cet exemple, l'opérateur ne s'exécute que sur les pools de nœuds pool-0
et pool-1
.
Tel que l'exemple est actuellement configuré, la tâche échoue. Voyons maintenant comment déboguer la tâche et résoudre les erreurs.
- Vérifiez que la tâche
ex-pod-affinity
échoue. - Vérifiez les journaux de la tâche
ex-pod-affinity
.En consultant les journaux, vous pourrez constater que la tâche échoue car les pools de nœuds
pool-0
etpool-1
n'existent pas. Pour vérifier que les pools de nœuds existent dans
values
, effectuez l'une des modifications de configuration suivantes :- Si vous avez créé un pool de nœuds précédemment, remplacez
pool-0
etpool-1
par les noms de vos pools de nœuds, puis importez à nouveau votre DAG. - Créez un pool de nœuds nommé
pool-0
oupool-1
. Vous pouvez créer les deux, mais la tâche n'a besoin que d'un pool pour réussir. - Remplacez
pool-0
etpool-1
pardefault-pool
, qui est le pool par défaut utilisé par Airflow. Ensuite, importez à nouveau votre DAG. Remarque : Par défaut, les pods Kubernetes sont programmés dansdefault-pool
. Si vous ajoutez des pools ultérieurement, ceux-ci seront limités àdefault-pool
.
- Si vous avez créé un pool de nœuds précédemment, remplacez
Patientez quelques minutes pendant la mise à jour de votre environnement.
Exécutez à nouveau la tâche
ex-pod-affinity
.
Configuration complète
Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator
. Il n'est pas nécessaire de modifier le code pour que la tâche ex-all-configs
réussisse.
Pour en savoir plus sur chaque variable, consultez la documentation de référence Airflow au sujet de KubernetesPodOperator
.
Gérer les DAG
Afficher l'état d'une tâche
- Accédez à l'interface Web d'Airflow.
- Sur la page DAG, cliquez sur le nom du DAG, par exemple
composer_sample_kubernetes_pod
. - Sur la page des informations relatives au DAG, cliquez sur Graph View (Vue graphique).
Vérifiez l'état :
Échec : la tâche apparaît dans un encadré rouge, comme pour
ex-kube-templates
ci-dessous. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention State: Failed (État : Échec).Réussite : la tâche apparaît dans un encadré vert, comme pour
pod-ex-minimum
ci-dessous. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention State: Success (État : Réussite).
Vérifier les journaux d'une tâche
- Dans l'interface utilisateur d'Airflow, affichez l'état de la tâche.
- Dans la vue graphique du DAG, cliquez sur le nom de la tâche.
- Dans le menu contextuel des instances de la tâche, cliquez sur View Log (Afficher le journal).
Ré-exécuter une tâche
- Pour revenir au DAG :
- Dans la barre d'outils de l'interface utilisateur d'Airflow, cliquez sur DAG.
- Cliquez sur le nom du DAG, par exemple
composer_samples_kubernetes_pod
.
- Pour exécuter la tâche à nouveau, procédez comme suit :
- Cliquez sur le nom de la tâche.
- Cliquez sur Clear (Effacer), puis sur OK. La tâche s'exécute à nouveau automatiquement.
Dépannage
Conseils pour résoudre les problèmes liés aux pods
En plus de vérifier les journaux des tâches, vous devez également consulter les journaux suivants :
Sortie du programmeur et des nœuds de calcul Airflow :
- Accédez au bucket Cloud Storage de l'environnement Cloud Composer. Il s'agit du bucket dans lequel se trouvent les DAG.
- Examinez les journaux sous
logs/
DAG_NAME
/
TASK_ID
/
EXECUTION_DATE
.
Journaux détaillés des pods dans Cloud Console, dans les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.
Codes de retour non nuls si GKEPodOperator
est également utilisé
Lorsque vous utilisez KubernetesPodOperator
et GKEPodOperator
, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.
Un schéma courant lors de l'utilisation de KubernetesPodOperator
et de GKEPodOperator
consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.
Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e
en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.
La tâche échoue malgré la réussite du pod
Dans les environnements Cloud Composer qui exécutent composer-1.4.1-airflow-*
ou une version antérieure, procédez comme suit :
Si une tâche Airflow s'exécute pendant une heure et que les journaux de la tâche se terminent par kubernetes.client.rest.ApiException: (401)
et Reason: Unauthorized
, la tâche Kubernetes sous-jacente continue de s'exécuter après ce point et peut même réussir. Cependant, Airflow signale que la tâche a échoué.
Pour résoudre ce problème, ajoutez une dépendance explicite du package PyPI sur kubernetes>=8.0.1
.
Délais d'inactivité des pods
Le délai d'inactivité par défaut de KubernetesPodOperator
est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds
lorsque vous créez l'opérateur KubernetesPodOperator
.
Lorsque le délai d'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur Web d'Airflow. Exemple :
Executingon 2018-07-23 19:06:58.133811 Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py'] Event: pod-name-9a8e9d06 had an event of type Pending ... ... Event: pod-name-9a8e9d06 had an event of type Pending Traceback (most recent call last): File "/usr/local/bin/airflow", line 27, in args.func(args) File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run pool=args.pool, File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Un dépassement du délai d'inactivité d'un pod peut également se produire lorsque le compte de service Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche en question. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Stackdriver Logging.
L'établissement d'une nouvelle connexion a échoué
La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse
Pour vérifier si votre cluster effectue une mise à niveau, accédez au menu GKE dans Cloud Console et recherchez l'icône de chargement à côté du nom du cluster de votre environnement.
Ressources associées
- Les bases de Kubernetes
- Suivez le tutoriel Airflow.
- Obtenez plus d'informations sur la programmation et les déclencheurs d'Airflow.