Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment utiliser KubernetesPodOperator
pour déployer
Pods Kubernetes
de Cloud Composer vers Google Kubernetes Engine
qui fait partie de votre environnement Cloud Composer et pour vous assurer
que votre environnement dispose des ressources appropriées.
KubernetesPodOperator
lancements
Pods Kubernetes
du cluster de votre environnement. En comparaison,
Les opérateurs Google Kubernetes Engine exécutent des pods Kubernetes dans un
qui peut être un cluster distinct, sans lien avec votre
environnement. Vous pouvez aussi créer et supprimer des clusters
Opérateurs Google Kubernetes Engine.
L'utilisation de KubernetesPodOperator
est recommandée si vous avez besoin :
- de dépendances Python personnalisées qui ne sont pas disponibles via le dépôt PyPI public ;
- de dépendances binaires qui ne sont pas disponibles dans l'image de nœud de calcul Cloud Composer issue de la banque d'images.
Cette page présente un exemple de DAG Airflow qui inclut les éléments suivants :
Configurations KubernetesPodOperator
:
- Configuration minimale : définit seulement les paramètres obligatoires.
- Configuration du modèle : utilise des paramètres que vous pouvez modéliser avec Jinja.
Configuration des variables 'secret' : transmet un objet secret Kubernetes au pod.
Dans Cloud Composer 2, la configuration de l'affinité de pod n'est pas disponible. Utilisez plutôt les opérateurs GKE pour lancer les pods dans un autre cluster.
Configuration complète : inclut toutes les configurations.
Avant de commencer
Dans Cloud Composer 2, le cluster de votre environnement évolue automatiquement. Les charges de travail supplémentaires que vous exécutez à l'aide de
KubernetesPodOperator
évoluent indépendamment de votre environnement. Votre environnement n'est pas affecté par l'augmentation de la demande de ressources, mais son cluster peut évoluer à la hausse en fonction de la demande de ressources. Les tarifs des charges de travail supplémentaires que vous exécutez dans votre de l'environnement d'exécution Modèle de tarification de Cloud Composer 2 et utilise SKU pour les solutions de calcul Cloud Composer :Cloud Composer 2 utilise des clusters GKE avec Fédération d'identité de charge de travail pour GKE Par défaut, les pods qui s'exécutent dans un espace de noms nouvellement créé ou la
composer-user-workloads
ne peut pas accéder aux ressources Google Cloud. Lorsque vous utilisez Fédération d'identité de charge de travail pour GKE, comptes de service Kubernetes associés à doivent être mappés à des comptes de service Google Cloud pour l'autorisation d'identité de service pour les requêtes adressées aux API Google et à d'autres services.De ce fait, si vous exécutez des pods dans l'espace de noms
composer-user-workloads
, ou un espace de noms nouvellement créé dans le cluster de votre environnement, Liaisons IAM entre Kubernetes et Google Cloud les comptes de service ne sont pas créés et ces pods ne peuvent pas accéder aux ressources de votre projet Google Cloud.Si vous voulez que vos pods aient accès aux ressources Google Cloud, utilisez l'espace de noms
composer-user-workloads
ou créez le vôtre comme décrit plus loin.Pour fournir un accès aux ressources de votre projet, Suivez les instructions de la section "Fédération d'identité de charge de travail pour GKE". et configurez les liaisons:
- Créez un espace de noms distinct dans le cluster de votre environnement.
- Créez une liaison entre les
composer-user-workloads/<namespace_name>
compte de service Kubernetes et le compte de service de votre environnement. - Ajoutez l'annotation de compte de service de votre environnement au compte de service Kubernetes.
- Lorsque vous utilisez
KubernetesPodOperator
, spécifiez l'espace de noms et le compte de service Kubernetes dans les paramètresnamespace
etservice_account_name
.
Cloud Composer 2 utilise des clusters GKE avec Workload Identité. Le serveur de métadonnées GKE met quelques secondes pour commencer à accepter des requêtes sur un pod créé récemment. Par conséquent, tente de s'authentifier à l'aide de Workload Identity dans les premières secondes La vie du pod peut échouer. Pour en savoir plus sur cette limitation, consultez Restrictions de Workload Identity
Cloud Composer 2 utilise des clusters Autopilot, qui introduisent la notion de classes de calcul:
Par défaut, si aucune classe n'est sélectionnée, la classe
general-purpose
est supposée lorsque vous créez des pods à l'aide deKubernetesPodOperator
.Chaque classe est associée à des propriétés et limites de ressources spécifiques, Pour en savoir plus à leur sujet, consultez Documentation Autopilot. Par exemple, les pods qui s'exécutent dans la classe
general-purpose
peuvent utiliser et jusqu'à 110 Gio de mémoire.
Si vous utilisez la version 5.0.0 du fournisseur CNCF Kubernetes, suivez les instructions section CNCF du fournisseur Kubernetes.
Configuration de KubernetesPodOperator
Pour suivre cet exemple, placez l'intégralité du fichier kubernetes_pod_operator.py
dans le dossier dags/
de votre environnement ou ajoutez le code pertinent KubernetesPodOperator
à un DAG.
Les sections suivantes décrivent chacune des configurations de KubernetesPodOperator
de l'exemple. Pour plus d'informations sur chaque variable de configuration, consultez la documentation de référence Airflow.
Configuration minimale
Pour créer une KubernetesPodOperator
, seuls le name
du pod, l'namespace
où exécuter le pod, le image
à utiliser et le task_id
sont obligatoires.
Lorsque vous placez l'extrait de code suivant dans un DAG, la configuration utilise les valeurs par défaut dans /home/airflow/composer_kube_config
. Il n'est pas nécessaire de modifier le code pour que la tâche pod-ex-minimum
réussisse.
Configuration du modèle
Airflow est compatible avec la modélisation Jinja.
Vous devez déclarer les variables requises (task_id
, name
, namespace
, image
) avec l'opérateur. Comme le montre l'exemple suivant, vous pouvez modéliser tous les autres paramètres avec Jinja, y compris cmds
, arguments
, env_vars
et config_file
.
Sans modification du DAG ou de votre environnement, la tâche ex-kube-templates
échoue en raison de deux erreurs. Les journaux indiquent que cette tâche échoue car la variable appropriée n'existe pas (my_value
). La deuxième erreur, que vous pouvez obtenir après avoir corrigé la première erreur, indique que la tâche échoue car core/kube_config
est introuvable dans config
.
Pour corriger ces deux erreurs, suivez les étapes décrites plus loin.
Pour définir my_value
avec gcloud
ou l'interface utilisateur d'Airflow, procédez comme suit :
Interface utilisateur d'Airflow
Dans l'interface utilisateur d'Airflow 2:
Accédez à l'interface utilisateur d'Airflow.
Dans la barre d'outils, sélectionnez Admin > Variables (Administration > Variables).
Sur la page List Variable (Variable de liste), cliquez sur Add a new record (Ajouter un enregistrement).
Sur la page Add Variable (Ajouter une variable), saisissez les informations suivantes :
- Key (Clé) :
my_value
- Val (Valeur) :
example_value
- Key (Clé) :
Cliquez sur Enregistrer.
gcloud
Pour Airflow 2, saisissez la commande suivante :
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Remplacez :
ENVIRONMENT
par le nom de l'environnement.LOCATION
par la région où se trouve l'environnement.
Pour faire référence à un config_file
personnalisé (fichier de configuration Kubernetes), remplacez l'option de configuration Airflow kube_config
par une configuration Kubernetes valide:
Section | Clé | Valeur |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Patientez quelques minutes pendant la mise à jour de votre environnement. Exécutez ensuite à nouveau la tâche ex-kube-templates
et vérifiez que la tâche ex-kube-templates
réussit.
Configuration des variables de secret
Un secret Kubernetes
est un objet qui contient des données sensibles. Vous pouvez transmettre des secrets aux pods Kubernetes à l'aide de KubernetesPodOperator
.
Les secrets doivent être définis dans Kubernetes, faute de quoi le lancement du pod est impossible.
Cet exemple présente deux façons d'utiliser les secrets Kubernetes: en tant que variable d'environnement et en tant que volume installé par le pod.
Le premier secret, airflow-secrets
, est défini sur une variable d'environnement Kubernetes nommée SQL_CONN
(par opposition à une variable d'environnement Airflow ou Cloud Composer).
Le second secret, service-account
, installe service-account.json
, un fichier contenant un jeton de compte de service, dans /var/secrets/google
.
Voici à quoi ressemblent ces secrets :
Le nom du premier secret Kubernetes est défini dans la variable secret
.
Ce secret particulier est nommé airflow-secrets
. Il est exposé en tant que variable d'environnement, comme indiqué par deploy_type
. La variable d'environnement sur laquelle il est défini, deploy_target
, est SQL_CONN
. Enfin, la clé (key
) du secret stocké dans deploy_target
est sql_alchemy_conn
.
Le nom du second secret Kubernetes est défini dans la variable secret
.
Ce secret particulier est nommé service-account
. Il est exposé en tant que volume, comme indiqué par deploy_type
. Le chemin d'accès au fichier à installer, deploy_target
, est /var/secrets/google
. Enfin, la clé (key
) du secret stocké dans deploy_target
est service-account.json
.
Voici à quoi ressemble la configuration de l'opérateur :
Sans modification du DAG ou de votre environnement, la tâche ex-kube-secrets
échoue. Si vous consultez les journaux, la tâche échoue en raison d'une erreur Pod took too long to start
. Cette erreur se produit car Airflow ne trouve pas le secret spécifié dans la configuration, secret_env
.
gcloud
Pour définir le secret à l'aide de gcloud
, procédez comme suit :
Obtenez des informations sur le cluster de votre environnement Cloud Composer.
Exécutez la commande suivante :
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Remplacez :
ENVIRONMENT
par le nom de votre environnement.LOCATION
par la région où se trouve l'environnement Cloud Composer.
Le résultat de cette commande utilise le format suivant :
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Pour obtenir l'ID du cluster GKE, copiez le résultat après
/clusters/
(se termine par-gke
).
Connectez-vous à votre cluster GKE en exécutant la commande suivante :
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Remplacez :
CLUSTER_ID
par votre ID de cluster GKE.PROJECT
par l'ID de votre projet Google Cloud.LOCATION
par la région où se trouve l'environnement Cloud Composer.
Créer des secrets Kubernetes
Créez un secret Kubernetes qui définit la valeur de
sql_alchemy_conn
surtest_value
en exécutant la commande suivante :kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Créez un secret Kubernetes qui définit la valeur de
service-account.json
sur le chemin d'accès local d'un fichier de clé de compte de service appelékey.json
en exécutant la commande suivante :kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Après avoir défini les secrets, exécutez à nouveau la tâche
ex-kube-secrets
dans l'interface utilisateur d'Airflow.Vérifiez que la tâche
ex-kube-secrets
réussit.
Configuration complète
Cet exemple présente toutes les variables que vous pouvez configurer dans KubernetesPodOperator
. Il n'est pas nécessaire de modifier le code pour que la tâche ex-all-configs
réussisse.
Pour en savoir plus sur chaque variable, consultez la documentation de référence Airflow au sujet de KubernetesPodOperator
.
Informations sur le fournisseur CNCF Kubernetes
GKEStartPodOperator et KubernetesPodOperator sont implémentés dans le fournisseur apache-airflow-providers-cncf-kubernetes
.
Pour obtenir les notes de version refusées pour le fournisseur CNCF Kubernetes, consultez le site Web du fournisseur Kubernetes CNCF.
Version 6.0.0
Dans la version 6.0.0 du package fournisseur Kubernetes CNCF,
la connexion kubernetes_default
est utilisée par défaut dans
la KubernetesPodOperator
.
Si vous avez spécifié une connexion personnalisée dans la version 5.0.0, cette connexion personnalisée
est toujours utilisée par l'opérateur. Pour revenir à l'utilisation de kubernetes_default
vous pouvez ajuster vos DAG en conséquence.
Version 5.0.0
Cette version introduit quelques modifications incompatibles avec les versions antérieures
par rapport à la version 4.4.0. Les plus importants sont liés
La connexion kubernetes_default
, qui n'est pas utilisée dans la version 5.0.0.
- La connexion
kubernetes_default
doit être modifiée. Chemin d'accès à la configuration du kube doit être défini sur/home/airflow/composer_kube_config
(comme illustré dans la figure 1). À la place, vous devez ajouterconfig_file
à la configurationKubernetesPodOperator
(comme indiqué dans le code suivant : exemple).
- Modifiez le code d'une tâche à l'aide de KubernetesPodOperator comme suit:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Pour en savoir plus sur la version 5.0.0, consultez les notes de version du fournisseur Kubernetes CNCF.
Dépannage
Conseils pour résoudre les problèmes liés aux pods
En plus de consulter les journaux des tâches dans l'interface utilisateur d'Airflow, vous devez également consulter les journaux suivants:
Sortie du programmeur et des nœuds de calcul Airflow :
Dans la console Google Cloud, accédez à la page Environnements.
Suivez le lien DAG pour votre environnement.
Dans le bucket de votre environnement, augmentez un niveau.
Examinez les journaux dans le dossier
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Journaux détaillés des pods dans la console Google Cloud des charges de travail GKE. Ces journaux incluent le fichier YAML de définition des pods, les événements des pods et les détails des pods.
Codes de retour non nuls si GKEStartPodOperator
est également utilisé
Lorsque vous utilisez KubernetesPodOperator
et GKEStartPodOperator
, le code de retour du point d'entrée du conteneur détermine si la tâche est considérée comme réussie ou non. Les codes de retour non nuls indiquent un échec.
Un schéma courant lors de l'utilisation de KubernetesPodOperator
et de GKEStartPodOperator
consiste à exécuter un script shell comme point d'entrée du conteneur pour regrouper plusieurs opérations dans le conteneur.
Si vous écrivez un script de ce type, nous vous recommandons d'inclure la commande set -e
en haut du script afin que les commandes ayant échoué dans le script arrêtent le script et propagent l'échec à l'instance de tâche Airflow.
Délais d'inactivité des pods
Le délai d'inactivité par défaut de KubernetesPodOperator
est de 120 secondes, ce qui signifie que ce délai peut être dépassé avant que des images volumineuses ne puissent être téléchargées. Vous pouvez augmenter le délai d'inactivité en modifiant le paramètre startup_timeout_seconds
lorsque vous créez l'opérateur KubernetesPodOperator
.
Lorsqu'un pod expire, le journal spécifique à la tâche est disponible dans l'interface utilisateur d'Airflow. Exemple :
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Les délais d'inactivité des pods peuvent également se produire Compte de service Cloud Composer ne dispose pas des autorisations IAM nécessaires pour effectuer la tâche à votre main. Pour vérifier cela, examinez les erreurs au niveau du pod à l'aide des tableaux de bord GKE, afin de consulter les journaux de votre charge de travail en question, ou utilisez Cloud Logging.
Échec de l'établissement d'une nouvelle connexion
La mise à niveau automatique est activée par défaut dans les clusters GKE. Si un pool de nœuds se trouve dans un cluster en cours de mise à niveau, le message d'erreur suivant peut s'afficher :
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Pour vérifier si votre cluster effectue une mise à niveau, accédez à la page Clusters Kubernetes dans la console Google Cloud, puis recherchez l'icône de chargement à côté du nom du cluster de votre environnement.