Cloud Composer 1 | Cloud Composer 2
Cette page explique comment utiliser KubernetesPodOperator
pour déployer des pods Kubernetes à partir de Cloud Composer dans le cluster Google Kubernetes Engine faisant partie de votre environnement Cloud Composer et pour vous assurer que votre environnement dispose des ressources appropriées.
KubernetesPodOperator
lance les pods Kubernetes
dans le cluster de votre environnement. Les opérateurs Google Kubernetes Engine, quant à eux, exécutent des pods Kubernetes dans un cluster spécifié, qui peut être un cluster distinct et non lié à votre environnement. Vous pouvez également créer et supprimer des clusters à l'aide d'opérateurs Google Kubernetes Engine.
L'utilisation de KubernetesPodOperator
est recommandée si vous avez besoin :
- de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
- de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.
Cette page présente un exemple de DAG Airflow qui inclut les configurations KubernetesPodOperator
suivantes:
- Configuration minimale : définit seulement les paramètres obligatoires.
- Configuration du modèle : utilise des paramètres que vous pouvez modéliser avec Jinja.
- Configuration des variables 'secret' : transmet un objet secret Kubernetes au pod.
Configuration de l'affinité des pods : limite les nœuds disponibles sur lesquels les pods sont programmés.
Configuration complète : inclut toutes les configurations.
Avant de commencer
- Nous vous recommandons d'utiliser la dernière version de Cloud Composer. Cette version doit être au minimum compatible avec le règlement sur les abandons de versions et la compatibilité.
- Assurez-vous que votre environnement dispose de suffisamment de ressources. Le lancement de pods dans un environnement où les ressources sont insuffisantes peut entraîner des erreurs au niveau des nœuds de calcul et des programmeurs Airflow.
Configurer les ressources de votre environnement Cloud Composer
Lorsque vous créez un environnement Cloud Composer, vous spécifiez ses paramètres de performance, y compris les paramètres de performance du cluster de l'environnement. Le lancement de pods Kubernetes dans le cluster de l'environnement peut entraîner une concurrence pour les ressources de cluster, telles que le processeur ou la mémoire. Dans la mesure où le programmeur et les nœuds de calcul Airflow appartiennent au même cluster GKE, ils ne fonctionneront pas correctement si cette concurrence entraîne un épuisement des ressources.
Pour éviter une pénurie de ressources, prenez l'une ou plusieurs des mesures suivantes :
- (Recommandé) Créez un pool de nœuds
- Augmentez le nombre de nœuds dans votre environnement.
- Spécifiez le type de machine approprié.
Créer un pool de nœuds
La méthode privilégiée pour éviter l'épuisement des ressources dans l'environnement Cloud Composer est de créer un pool de nœuds et de configurer les pods Kubernetes pour qu'ils s'exécutent en exploitant exclusivement les ressources de ce pool.
Console
Dans la console Google Cloud, accédez à la page Environnements.
Cliquez sur le nom de votre environnement.
Sur la page Détails de l'environnement, accédez à l'onglet Configuration de l'environnement.
Dans la section Ressources > Cluster GKE, suivez le lien Afficher les détails du cluster.
Créez un pool de nœuds comme décrit dans la section Ajouter un pool de nœuds.
gcloud
Déterminez le nom du cluster de votre environnement:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.
La sortie contient le nom du cluster de votre environnement. Par exemple,
europe-west3-example-enviro-af810e25-gke
.Créez un pool de nœuds comme décrit dans la section Ajouter un pool de nœuds.
Augmentez le nombre de nœuds dans votre environnement.
L'augmentation du nombre de nœuds dans votre environnement Cloud Composer augmente la puissance de calcul disponible pour vos charges de travail. Cette augmentation ne fournit pas de ressources supplémentaires pour les tâches qui nécessitent plus de processeur ou de RAM que le type de machine spécifié.
Pour augmenter le nombre de nœuds, mettez à jour votre environnement.
Spécifier le type de machine approprié
Lors de la création de l'environnement Cloud Composer, vous pouvez spécifier un type de machine. Pour garantir la disponibilité des ressources, spécifiez un type de machine pour le type de calcul qui se produit dans votre environnement Cloud Composer.
Configuration de KubernetesPodOperator
Pour suivre cet exemple, placez l'intégralité du fichier kubernetes_pod_operator.py
dans le dossier dags/
de votre environnement ou ajoutez le code pertinent KubernetesPodOperator
à un DAG.
Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator
de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.
Airflow 2
Airflow 1
Configuration minimale
Pour créer un KubernetesPodOperator
, seuls les éléments name
du pod, namespace
où exécuter le pod, image
et task_id
sont requis.
Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config
. Il n'est pas nécessaire de modifier le code pour que la tâche pod-ex-minimum
réussisse.
Airflow 2
Airflow 1
Configuration du modèle
Airflow est compatible avec la modélisation Jinja.
Vous devez déclarer les variables requises (task_id
, name
, namespace
, image
) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds
, arguments
, env_vars
et config_file
.
Airflow 2
Airflow 1
Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates
échoue en raison de deux erreurs. Les journaux indiquent que cette tâche échoue car la variable appropriée n'existe pas (my_value
). La deuxième erreur, que vous pouvez obtenir après avoir corrigé la première erreur, indique que la tâche échoue car core/kube_config
est introuvable dans config
.
Pour corriger ces deux erreurs, suivez les étapes décrites plus loin.
Pour définir my_value
avec gcloud
ou l'interface utilisateur d'Airflow, procédez comme suit :
Interface utilisateur d'Airflow
Dans l'interface utilisateur d'Airflow 2:
Accédez à l'interface utilisateur d'Airflow.
Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).
Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).
Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :
- Key (Clé) :
my_value
- Val (Valeur) :
example_value
- Key (Clé) :
Cliquez sur Enregistrer.
Dans l'interface utilisateur d'Airflow 1:
Accédez à l'interface utilisateur d'Airflow.
Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).
Sur la page Variables, cliquez sur l'onglet Create (Créer).
Sur la page Variable, saisissez les informations suivantes :
- Key (Clé) :
my_value
- Val (Valeur) :
example_value
- Key (Clé) :
Cliquez sur Enregistrer.
gcloud
Pour Airflow 2, saisissez la commande suivante :
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Pour Airflow 1, saisissez la commande suivante :
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
Remplacez :
ENVIRONMENT
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.
Pour faire référence à un config_file
personnalisé (fichier de configuration Kubernetes), remplacez l'option de configuration Airflow kube_config
par une configuration Kubernetes valide:
Section | Clé | Valeur |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Patientez quelques minutes pendant la mise à jour de votre environnement. Exécutez ensuite à nouveau la tâche ex-kube-templates
et vérifiez que la tâche ex-kube-templates
réussit.
Configuration des variables de secret
Un secret Kubernetes est un objet qui contient des données sensibles. Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator
.
Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.
Cet exemple présente deux façons d'utiliser les secrets Kubernetes: en tant que variable d'environnement et en tant que volume installé par le pod.
Le premier secret, airflow-secrets
, est défini sur une variable d'environnement Kubernetes nommée SQL_CONN
(par opposition à une variable d'environnement Airflow ou Cloud Composer).
Le second secret, service-account
, installe service-account.json
, un fichier contenant un jeton de compte de service, dans /var/secrets/google
.
Voici à quoi ressemblent ces secrets :
Airflow 2
Airflow 1
Le nom du premier secret Kubernetes est défini dans la variable secret
.
Ce secret particulier est nommé airflow-secrets
. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type
. La variable d'environnement sur laquelle il est défini, deploy_target
, est SQL_CONN
. Enfin, la clé (key
) du secret stocké dans deploy_target
est sql_alchemy_conn
.
Le nom du second secret Kubernetes est défini dans la variable secret
.
Ce secret particulier est nommé service-account
. Il est exposé en tant que volume, comme indiqué par deploy_type
. Le chemin d'accès au fichier à installer, deploy_target
, est /var/secrets/google
. Enfin, la clé (key
) du secret stocké dans deploy_target
est service-account.json
.
La configuration de l'opérateur se présente comme suit :
Airflow 2
Airflow 1
Sans modification du DAG ou de votre environnement, la tâche ex-kube-secrets
échoue. Si vous consultez les journaux, la tâche échoue en raison d'une erreur Pod took too long to start
. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration, secret_env
.
gcloud
Pour définir le secret à l'aide de gcloud
, procédez comme suit :
Obtenez des informations sur le cluster de votre environnement Cloud Composer.
Exécutez la commande suivante :
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Remplacez :
ENVIRONMENT
par le nom de votre environnement.LOCATION
par la région où se trouve l'environnement Cloud Composer.
Le résultat de cette commande utilise le format suivant :
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
.Pour obtenir l'ID du cluster GKE, copiez le résultat après
/clusters/
(se termine par-gke
).Pour obtenir la zone, copiez le résultat après
/zones/
.
Connectez-vous à votre cluster GKE en exécutant la commande suivante :
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
Remplacez :
CLUSTER_ID
par votre ID de cluster GKE.PROJECT
par l'ID de votre projet Google Cloud.ZONE
par la zone où se trouve votre GKE.
Créer des secrets Kubernetes
Créez un secret Kubernetes qui définit la valeur de
sql_alchemy_conn
surtest_value
en exécutant la commande suivante :kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Créez un secret Kubernetes qui définit la valeur de
service-account.json
sur le chemin d'accès local d'un fichier de clé de compte de service appelékey.json
en exécutant la commande suivante :kubectl create secret generic service-account \ --from-file service-account.json=./key.json
Après avoir défini les secrets, exécutez à nouveau la tâche
ex-kube-secrets
dans l'interface utilisateur d'Airflow.Vérifiez que la tâche
ex-kube-secrets
réussit.
Configuration de l'affinité des pods
Lorsque vous configurez le paramètre affinity
dans KubernetesPodOperator
, vous contrôlez les nœuds sur lesquels les pods sont programmés, par exemple les nœuds d'un pool de nœuds spécifique. Dans cet exemple, l'opérateur ne s'exécute que sur les pools de nœuds pool-0
et pool-1
. Vos nœuds d'environnement Cloud Composer 1 se trouvant dans default-pool
, vos pods ne s'exécutent pas sur les nœuds de votre environnement.
Airflow 2
Airflow 1
Tel que l'exemple est actuellement configuré, la tâche échoue. Si vous consultez les journaux, la tâche échoue car les pools de nœuds pool-0
et pool-1
n'existent pas.
Pour vérifier que les pools de nœuds existent dans values
, effectuez l'une des modifications de configuration suivantes :
Si vous avez créé un pool de nœuds précédemment, remplacez
pool-0
etpool-1
par les noms de vos pools de nœuds, puis importez à nouveau votre DAG.Créez un pool de nœuds nommé
pool-0
oupool-1
. Vous pouvez créer les deux, mais la tâche n'a besoin que d'un pool pour réussir.Remplacez
pool-0
etpool-1
pardefault-pool
, qui est le pool par défaut utilisé par Airflow. Ensuite, importez à nouveau votre DAG.
Une fois les modifications effectuées, attendez quelques minutes que votre environnement soit mis à jour.
Exécutez ensuite à nouveau la tâche ex-pod-affinity
et vérifiez que la tâche ex-pod-affinity
réussit.
Configuration complète
Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator
. Il n'est pas nécessaire de modifier le code pour que la tâche ex-all-configs
réussisse.
Pour en savoir plus sur chaque variable, consultez la documentation de référence Airflow au sujet de KubernetesPodOperator
.
Airflow 2
Airflow 1
Informations sur le fournisseur Kubernetes CNCF
GKEStartPodOperator et KubernetesPodOperator sont implémentés dans le fournisseur apache-airflow-providers-cncf-kubernetes
.
Pour obtenir les notes de version abandonnées pour le fournisseur Kubernetes CNCF, consultez le site Web du fournisseur Kubernetes CNCF.
Version 6.0.0
Dans la version 6.0.0 du package de fournisseur Kubernetes CNCF, la connexion kubernetes_default
est utilisée par défaut dans KubernetesPodOperator
.
Si vous avez spécifié une connexion personnalisée dans la version 5.0.0, cette connexion personnalisée est toujours utilisée par l'opérateur. Pour revenir à l'utilisation de la connexion kubernetes_default
, vous pouvez ajuster vos DAG en conséquence.
Version 5.0.0
Cette version introduit quelques modifications incompatibles avec les versions antérieures par rapport à la version 4.4.0. Les plus importants, que vous devez connaître, sont liés à la connexion kubernetes_default
, qui n'est pas utilisée dans la version 5.0.0.
La connexion
kubernetes_default
doit être modifiée : le chemin de configuration du kube doit être défini sur/home/airflow/composer_kube_config
ouconfig_file
doit être ajouté à la configurationKubernetesPodOperator
(comme indiqué ci-dessous).Procédez comme suit pour modifier le code d'une tâche à l'aide de KubernetesPodOperator
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Pour en savoir plus sur la version 5.0.0, consultez les notes de version du fournisseur Kubernetes CNCF.
Dépannage
Conseils pour résoudre les problèmes liés aux pods
En plus de consulter les journaux des tâches dans l'interface utilisateur d'Airflow, vous devez également consulter les journaux suivants:
Sortie du programmeur et des nœuds de calcul Airflow :
Dans la console Google Cloud, accédez à la page Environnements.
Suivez le lien DAG pour votre environnement.
Dans le bucket de votre environnement, augmentez un niveau.
Examinez les journaux dans le dossier
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Journaux détaillés des pods dans la console Google Cloud sous les charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.
Codes de retour non nuls si GKEStartPodOperator
est également utilisé
Lorsque vous utilisez KubernetesPodOperator
et GKEStartPodOperator
, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.
Un schéma courant lors de l'utilisation de KubernetesPodOperator
et de GKEStartPodOperator
consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.
Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e
en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.
Délais d'inactivité des pods
Le délai d'inactivité par défaut de KubernetesPodOperator
est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds
lorsque vous créez l'opérateur KubernetesPodOperator
.
Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Un dépassement du délai d'inactivité d'un pod peut également se produire lorsque le compte de service Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche en question. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Cloud Logging.
Échec de l'établissement d'une nouvelle connexion
La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Pour vérifier si votre cluster est en cours de mise à niveau, dans la console Google Cloud, accédez à la page Clusters Kubernetes et recherchez l'icône de chargement à côté du nom du cluster de votre environnement.