Ce tutoriel décrit une manière d'automatiser une infrastructure cloud en utilisant Cloud Composer. L'exemple montre comment planifier des sauvegardes automatisées d'instances de machine virtuelle (VM) Compute Engine.
Cloud Composer est un service d'orchestration de workflow entièrement géré sur Google Cloud. Cloud Composer permet de créer des workflows à l'aide d'une API Python, de les planifier pour une exécution automatique ou de les démarrer manuellement, et de surveiller l'exécution des tâches en temps réel via une interface graphique.
Cloud Composer est basé sur Apache Airflow. Google exécute cette plate-forme d'orchestration Open Source sur un cluster Google Kubernetes Engine (GKE). Ce cluster permet de gérer les nœuds de calcul Airflow et offre de nombreuses possibilités d'intégration avec d'autres produits Google Cloud.
Ce tutoriel est destiné aux opérateurs, aux administrateurs informatiques et aux développeurs souhaitant automatiser leurs infrastructures et approfondir leurs connaissances techniques des principales fonctionnalités de Cloud Composer. Ce tutoriel ne doit pas être considéré comme un guide de reprise après sinistre de niveau entreprise, ni comme un recueil des bonnes pratiques en matière de sauvegardes. Pour en savoir plus sur la création d'un plan de reprise après sinistre pour votre entreprise, consultez le guide de planification de reprise après sinistre.
Définir l'architecture
Les workflows Cloud Composer sont constitués de graphes orientés acycliques, ou DAG (Directed Acyclic Graphs). Du point de vue d'Airflow, un DAG est un ensemble de tâches, organisé de façon à refléter leurs interdépendances directionnelles. Dans ce tutoriel, vous apprendrez à définir un workflow Airflow exécuté périodiquement afin d'effectuer la sauvegarde d'une instance de machine virtuelle Compute Engine à l'aide d'instantanés Persistent Disk.
La VM Compute Engine utilisée comme exemple est une instance associée à un disque persistant de démarrage. Conformément aux consignes sur la prise d'instantanés énoncées plus loin, le workflow de sauvegarde de Cloud Composer appelle l'API Compute Engine pour arrêter l'instance, prendre un instantané du disque persistant et redémarrer l'instance. Entre ces tâches, le workflow attend la fin de chaque opération avant de poursuivre.
Cette architecture est illustrée dans le schéma ci-dessous :
Avant de commencer le tutoriel, nous allons voir comment créer un environnement Cloud Composer. Cet environnement présente l'avantage d'utiliser plusieurs produits Google Cloud, sans qu'il soit nécessaire de les configurer individuellement.
- Cloud Storage : le DAG, le plug-in et les journaux Airflow sont stockés dans un bucket Cloud Storage.
- Google Kubernetes Engine : la plate-forme Airflow est basée sur une architecture de micro-services et peut être utilisée dans GKE.
- Les nœuds de calcul Airflow chargent le plug-in et les définitions de workflow à partir de Cloud Storage, puis exécutent chaque tâche à l'aide de l'API Compute Engine.
- Le planificateur Airflow assure l'exécution des sauvegardes selon la périodicité et l'ordre des tâches spécifiés.
- Redis est utilisé comme agent de messagerie entre les composants Airflow.
- La communication avec le dépôt de métadonnées est assurée par le proxy Cloud SQL.
- Cloud SQL et App Engine Flex : Cloud Composer utilise également une instance Cloud SQL pour les métadonnées, ainsi qu'une application App Engine Flex qui fournit l'interface utilisateur d'Airflow. Ces ressources ne figurent pas dans le schéma car elles résident dans un projet séparé géré par Google.
Consultez la section Présentation de Cloud Composer pour obtenir d'autres d'informations.
Effectuer le scaling du workflow
Le cas d'utilisation présenté dans ce tutoriel est simple : il s'agit de prendre un instantané d'une seule machine virtuelle à intervalles fixes. Toutefois, en conditions réelles, ce type de scénario peut inclure des centaines de VM rattachées à différentes parties de l'organisation ou à différents niveaux d'un système, chacune nécessitant différentes planifications de sauvegarde. Le scaling s'applique non seulement à notre exemple de VM Compute Engine, mais également à tout composant d'infrastructure pour lequel un processus planifié doit être exécuté.
Cloud Composer excelle dans ces cas d'utilisation complexes, car il s'agit d'un véritable moteur de workflow basé sur Apache Airflow hébergé dans le cloud, et non pas d'une simple alternative à Cloud Scheduler ou cron
.
Les DAG d'Airflow, qui constituent des représentations flexibles d'un workflow, s'adaptent aux besoins du monde réel tout en s'exécutant à partir d'un codebase unique. Pour créer des DAG adaptés à votre cas d'utilisation, vous pouvez utiliser une combinaison des deux approches ci-dessous :
- Créer une instance de DAG pour des groupes de composants d'infrastructure où une même planification peut être appliquée pour démarrer le processus
- Créer des instances de DAG indépendantes pour des groupes de composants d'infrastructure nécessitant leur propre planification
Un DAG peut traiter des composants en parallèle. Pour cela, une tâche doit démarrer une opération asynchrone pour chaque composant, ou vous devez créer une branche pour traiter chaque composant. Vous pouvez créer des DAG de manière dynamique à l'aide de code pour ajouter ou supprimer des branches et des tâches selon vos besoins.
En outre, vous pouvez modéliser les dépendances entre les niveaux d'application dans le même DAG. Par exemple, vous pouvez arrêter toutes les instances du serveur Web avant d'arrêter les instances du serveur d'applications.
Ces optimisations dépassent le cadre du présent tutoriel.
Bonnes pratiques à suivre pour les instantanés
Persistent Disk est un service de stockage de blocs durable qui peut être associé à une instance de machine virtuelle et utilisé comme disque de démarrage principal de l'instance ou comme disque secondaire non amorçable destiné aux données critiques. Les disques persistants sont hautement disponibles : pour chaque écriture, trois instances dupliquées sont écrites. Toutefois, une seule d'entre elles est facturée aux clients de Google Cloud.
Un instantané est une copie exacte d'un disque persistant à un moment donné. Les instantanés sont incrémentiels et compressés. Ils sont stockés de manière transparente dans Cloud Storage.
Il est possible de prendre des instantanés de n'importe quel disque persistant lorsque des applications sont en cours d'exécution. Aucun instantané ne contiendra jamais un bloc partiellement écrit. Toutefois, si une opération d'écriture couvrant plusieurs blocs est en cours lorsque le backend reçoit la requête de création de l'instantané, celui-ci peut ne contenir que quelques-uns des blocs mis à jour. Ces incohérences se traitent de la même manière que vous traiteriez des arrêts non propres.
Nous vous recommandons de suivre ces consignes pour assurer la cohérence des instantanés :
- Réduisez ou évitez les écritures sur disque pendant le processus de création d'un instantané. Planifiez vos sauvegardes pendant les heures creuses dans la mesure du possible.
- Pour les disques secondaires non amorçables, désactivez les applications et les processus qui écrivent des données et gelez ou désinstallez le système de fichiers.
Pour les disques de démarrage, il n’est pas toujours possible ni recommandé de geler le volume racine. Choisissez plutôt l'approche consistant à arrêter l'instance de machine virtuelle avant de prendre l'instantané.
Pour éviter les temps d'arrêt de service dus au gel ou à l'arrêt d'une machine virtuelle, nous vous conseillons d'utiliser une architecture à haute disponibilité. Pour plus d'informations, consultez la section Scénarios de reprise après sinistre pour les applications.
Établissez une convention d'attribution de noms cohérente pour les instantanés. Par exemple, utilisez un horodatage selon un niveau de précision approprié, concaténé avec le nom de l'instance, du disque et de la zone.
Pour en savoir plus sur la création d'instantanés cohérents, reportez-vous à la section Bonnes pratiques en matière d'instantanés.
Objectifs
- Créer des opérateurs Airflow personnalisés ainsi qu'un capteur pour Compute Engine.
- Créer un workflow Cloud Composer à l'aide des opérateurs Airflow et d'un capteur.
- Planifier le workflow de façon à sauvegarder une instance de Compute Engine à intervalles réguliers.
Coûts
Vous pouvez utiliser le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue.
Avant de commencer
- Connectez-vous à votre compte Google.
Si vous n'en possédez pas déjà un, vous devez en créer un.
-
Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.
-
Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.
Créez un environnement Cloud Composer. Pour réduire les coûts, choisissez une taille de disque de 20 Go.
Accéder à la page Créer l'environnementLa mise en service de l'environnement Cloud Composer prend environ 15 minutes, mais peut durer jusqu'à une heure.
Le code complet de ce tutoriel est disponible sur GitHub. Pour examiner les fichiers au fur et à mesure, ouvrez le dépôt dans Cloud Shell :
Accéder à Cloud Shell- Dans le répertoire de base de la console Cloud Shell, exécutez la commande suivante :
git clone https://github.com/GoogleCloudPlatform/composer-infra-python.git
Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Consultez la page Effectuer un nettoyage pour en savoir plus.
Configurer un exemple d'instance Compute Engine
La première étape consiste à créer l'exemple d'instance de machine virtuelle Compute Engine à sauvegarder. Cette instance exécute WordPress, un système de gestion de contenu Open Source.
Suivez ces étapes pour créer l'instance WordPress sur Compute Engine :
- Dans Google Cloud Marketplace, accédez à la page de lancement WordPress Certified by Bitnami.
- Cliquez sur Lancer.
Une fenêtre pop-up contenant la liste de vos projets s'affiche. Sélectionnez le projet que vous avez créé précédemment pour ce tutoriel.
Google Cloud configure les API nécessaires dans votre projet et, après une courte attente, affiche un écran avec les différentes options de configuration de votre instance WordPress Compute Engine.
Si vous le souhaitez, définissez le type de disque de démarrage sur SSD pour accélérer le démarrage de l'instance.
Cliquez sur Déployer.
Vous êtes dirigé vers l'écran de Deployment Manager, qui vous permet de suivre l'état du déploiement.
Le script WordPress Deployment Manager crée l'instance WordPress Compute Engine et deux règles de pare-feu pour permettre au trafic TCP d'atteindre l'instance via les ports 80 et 443. Ce processus peut prendre plusieurs minutes, chaque élément étant déployé et affichant une icône de progression en forme de roue.
À la fin du processus, l'instance WordPress est prête et diffuse le contenu par défaut sur l'URL du site Web. L'écran Deployment Manager affiche l'URL du site Web (Site address), l'URL de la console d'administration (Admin URL), le nom de l'utilisateur et le mot de passe, les liens vers la documentation et les étapes suivantes suggérées.
Cliquez sur l'adresse du site pour vérifier que votre instance WordPress est opérationnelle. Une page de blog WordPress par défaut doit s'afficher.
L'exemple d'instance Compute Engine est maintenant prêt. L'étape suivante consiste à configurer un processus de sauvegarde incrémentielle automatique du disque persistant de cette instance.
Création d'opérateurs Airflow personnalisés
Pour sauvegarder le disque persistant de l'instance de test, vous pouvez créer un workflow Airflow qui arrête l'instance, prend un instantané de son disque persistant et redémarre l'instance. Chacune de ces tâches est définie sous forme de code avec un opérateur Airflow personnalisé. Le code des opérateurs est ensuite rassemblé dans un plug-in Airflow.
Dans cette section, vous allez apprendre à créer des opérateurs Airflow personnalisés qui appellent la bibliothèque cliente Python Compute Engine afin de contrôler le cycle de vie de l'instance. D'autres possibilités existent pour cette opération, par exemple :
- Utiliser l'opérateur Airflow
BashOperator
pour exécuter les commandesgcloud compute
- Utiliser l'opérateur Airflow
HTTPOperator
pour exécuter des appels HTTP directs à l'API REST Compute Engine - Utiliser l'opérateur Airflow
PythonOperator
pour appeler des fonctions Python arbitraires sans définir d'opérateurs personnalisés
Ces alternatives ne sont pas traitées dans le présent tutoriel.
Autoriser les appels à l'API Compute Engine
Les opérateurs personnalisés que vous créez dans ce tutoriel utilisent la bibliothèque cliente Python pour appeler l'API Compute Engine. Les requêtes adressées à l'API doivent être authentifiées et autorisées. La méthode recommandée consiste à utiliser une stratégie appelée Application Default Credentials (ADC).
La stratégie ADC est appliquée chaque fois qu'un appel est effectué à partir d'une bibliothèque cliente :
- La bibliothèque vérifie si un compte de service est spécifié dans la variable d'environnement
GOOGLE_APPLICATION_CREDENTIALS
. - Si le compte de service n'est pas spécifié, la bibliothèque utilise le compte de service par défaut fourni par Compute Engine ou GKE.
Si ces deux méthodes échouent, une erreur se produit.
Les opérateurs Airflow créés dans ce tutoriel relèvent de la seconde méthode. Lorsque vous créez l'environnement Cloud Composer, un cluster GKE est provisionné. Les nœuds de ce cluster exécutent des pods de nœuds de calcul Airflow. À leur tour, ces nœuds de calcul exécutent le workflow avec les opérateurs personnalisés que vous définissez. Comme vous n'avez pas spécifié de compte de service lors de la création de l'environnement, le compte de service par défaut des nœuds de cluster GKE est celui utilisé par la stratégie ADC.
Les nœuds de cluster GKE sont des instances Compute Engine. Les identifiants associés au compte de service par défaut de Compute Engine dans le code de l'opérateur sont donc faciles à obtenir.
Ce code utilise les identifiants par défaut de l'application pour créer un client Python qui enverra les requêtes à l'API Compute Engine. Dans les sections suivantes, vous référencerez ce code à chaque création d'un opérateur Airflow.
Au lieu d'utiliser le compte de service Compute Engine par défaut, il est possible d'en créer un et de le configurer en tant que connexion dans la console d'administration Airflow. Cette méthode, décrite dans la page Gérer les connexions Airflow, permet un contrôle d'accès plus précis aux ressources Google Cloud. Cette alternative n'est pas traitée dans le présent tutoriel.
Arrêter l'instance Compute Engine en toute sécurité
Cette section analyse la création du premier opérateur Airflow personnalisé, StopInstanceOperator
. Cet opérateur appelle l'API Compute Engine pour arrêter l'instance Compute Engine qui exécute WordPress :
Dans Cloud Shell, ouvrez le fichier
gce_commands_plugin.py
à l'aide d'un éditeur de texte tel que Nano ou Vim :vi $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py
Examinez les importations situées en haut du fichier :
Les importations notables sont les suivantes :
BaseOperator
: classe de base dont tous les opérateurs personnalisés Airflow doivent hériter.AirflowPlugin
: classe de base pour créer un groupe d'opérateurs, formant un plug-in.apply_defaults
: décorateur de fonctions qui remplit les arguments avec les valeurs par défaut si ces valeurs ne sont pas spécifiées dans le constructeur d'opérateur.GoogleCredentials
: classe utilisée pour récupérer les identifiants par défaut de l'application.googleapiclient.discovery
: point d'entrée de la bibliothèque cliente permettant la découverte des API Google sous-jacentes. Dans ce cas, la bibliothèque cliente génère une ressource permettant d'interagir avec l'API Compute Engine.
Ensuite, examinez la classe
StopInstanceOperator
située au-dessous des importations :La classe
StopInstanceOperator
comporte trois méthodes :__init__
: constructeur de la classe. Reçoit le nom du projet, la zone dans laquelle l'instance est en cours d'exécution et le nom de l'instance que vous souhaitez arrêter. Initialise également la variableself.compute
en appelantget_compute_api_client
.get_compute_api_client
: méthode d'assistance qui renvoie une instance de l'API Compute Engine. Applique la stratégie ADC fournie parGoogleCredentials
pour s'authentifier auprès de l'API et autoriser les appels suivants.execute
: méthode principale de l'opérateur redéfinie parBaseOperator
. Airflow appelle cette méthode pour exécuter l'opérateur. La méthode enregistre un message d'information dans les journaux, puis appelle l'API Compute Engine pour arrêter l'instance Compute Engine spécifiée par les trois paramètres reçus dans le constructeur. À la fin, la fonctionsleep()
attend que l'instance soit arrêtée. Dans un environnement de production, vous devez utiliser une méthode plus déterministe, telle que la communication croisée entre opérateurs. Cette technique est décrite plus loin dans ce tutoriel.
La méthode stop()
de l'API Compute Engine arrête proprement l'instance de machine virtuelle.
Le système d'exploitation exécute les scripts d'arrêt init.d
, y compris celui de WordPress, dans le répertoire /etc/init.d/bitnami
. Ce script gère également le démarrage de WordPress lorsque la machine virtuelle est redémarrée. Vous pouvez consulter la définition du service avec la configuration d'arrêt et de démarrage dans le répertoire /etc/systemd/system/bitnami.service.
Créer des instantanés de sauvegarde incrémentielle nommés de manière unique
Vous allez créer dans cette section le deuxième opérateur personnalisé, SnapshotDiskOperator
. Cet opérateur prend un instantané du disque persistant de l'instance.
Dans le fichier gce_commands_plugin.py
que vous avez ouvert dans la section précédente, examinez la classe SnapshotDiskOperator
:
La classe SnapshotDiskOperator
comporte les méthodes suivantes :
__init__
: constructeur de la classe. Semblable au constructeur de la classeStopInstanceOperator
, mais en plus du nom du projet, de la zone et de l'instance, reçoit le nom du disque à partir duquel créer l'instantané. En effet, une instance peut être associée à plusieurs disques persistants.generate_snapshot_name
: cet exemple de méthode crée un nom unique simple pour chaque instantané à partir du nom de l'instance, de la date et de l'heure avec une précision à la seconde. Adaptez le nom à vos besoins en ajoutant, par exemple, le nom du disque lorsque plusieurs disques sont associés à une instance, ou en augmentant la précision temporelle pour accepter les requêtes de création d'instantanés ad hoc.execute
: méthode principale de l'opérateur redéfinie parBaseOperator
. Génère un nom d'instantané à l'aide de la méthodegenerate_snapshot_name
lors de son exécution par les nœuds de calcul Airflow. Enregistre ensuite un message d’information et appelle l’API Compute Engine pour créer l’instantané avec les paramètres reçus dans le constructeur.
Démarrer l'instance Compute Engine
Dans cette section, vous allez créer le troisième et dernier opérateur personnalisé, StartInstanceOperator
. Cet opérateur redémarre une instance Compute Engine.
Dans le fichier gce_commands_plugin.py
que vous avez précédemment ouvert, examinez la classe SnapshotDiskOperator
située en bas du fichier :
La classe StartInstanceOperator
comporte les méthodes suivantes :
__init__
: constructeur de la classe. Semblable au constructeur de la classeStopInstanceOperator
.execute
: méthode principale de l'opérateur redéfinie parBaseOperator
. Diffère des opérateurs précédents en appelant l'API Compute Engine appropriée pour démarrer l'instance indiquée dans les paramètres d'entrée du constructeur.
Définir le workflow Airflow
Vous venez de définir un plug-in Airflow composé de trois opérateurs. Ces opérateurs définissent les tâches faisant partie d'un workflow Airflow. Le workflow présenté ici est simple et linéaire, mais certains peuvent prendre la forme plus complexe des graphes orientés acycliques (DAG).
Dans cette section, vous allez créer la classe de plug-in qui expose les trois opérateurs, créer le DAG à l'aide de ces opérateurs, puis déployer le DAG sur Cloud Composer, et enfin exécuter le DAG.
Créer le plug-in
Jusqu'à présent, les opérateurs de démarrage, d'instantané et d'arrêt sont contenus dans le fichier gce_commands_plugin.py
. Pour utiliser ces opérateurs dans un workflow, vous devez les inclure dans une classe de plug-in.
Notez la classe
GoogleComputeEnginePlugin
au bas du fichiergce_commands_plugin.py
:Cette classe, qui hérite de
AirflowPlugin
, attribue au plug-in le nom internegce_commands_plugin
et y ajoute les trois opérateurs.Fermez le fichier
gce_commands_plugin.py
.
Configurer le graphe orienté acyclique
Le DAG définit le workflow exécuté par Airflow. Pour que le DAG identifie le disque à sauvegarder, vous devez définir certaines variables : l'instance Compute Engine à laquelle le disque est rattaché, la zone sur laquelle l'instance est exécutée et le projet dans lequel toutes les ressources sont disponibles.
Vous pouvez coder ces variables en dur dans le code source du DAG, mais il est recommandé de les définir en tant que variables Airflow. De cette manière, toutes les modifications de configuration peuvent être gérées de façon centralisée et indépendamment des déploiements de code.
Définissez la configuration du DAG comme suit :
Dans Cloud Shell, définissez le site de votre environnement Cloud Composer :
LOCATION=[CLOUD_ENV_LOCATION]
L'emplacement correspond à la région Compute Engine dans laquelle se trouve l'environnement Cloud Composer, par exemple :
us-central1
oueurope-west1
. Définie lors de la création de l'environnement, cette région est indiquée dans la page de la console Cloud Composer.Définissez le nom de l'environnement Cloud Composer :
ENVIRONMENT=$(gcloud composer environments list \ --format="value(name)" --locations $LOCATION)
Le paramètre
--format
permet de ne sélectionner que la colonnename
dans la table correspondante. Nous supposons ici qu'un seul environnement a été créé.Créez la variable
PROJECT
dans Airflow à partir du nom du projet Google Cloud actuel :gcloud composer environments run $ENVIRONMENT --location $LOCATION \ variables -- --set PROJECT $(gcloud config get-value project)
Où :
gcloud composer environments run
permet d'exécuter les commandes Airflow CLI.- La commande Airflow
variables
définit la variable AirflowPROJECT
sur la valeur renvoyée pargcloud config
.
Créez la variable
INSTANCE
dans Airflow avec le nom de l'instance WordPress :gcloud composer environments run $ENVIRONMENT --location $LOCATION \ variables -- --set INSTANCE \ $(gcloud compute instances list \ --format="value(name)" --filter="name~'.*wordpress.*'")
Cette commande utilise le paramètre
--filter
pour ne sélectionner que l'instance dont la valeurname
correspond à une expression régulière contenant la chaînewordpress
. Cette approche suppose qu’il n’y a qu’une seule instance de ce type et que le terme "wordpress" est inclus dans le nom de l'instance et du disque, ce qui est le cas si vous avez accepté les valeurs par défaut.Créez la variable
ZONE
dans Airflow à partir de la zone de l'instance WordPress :gcloud composer environments run $ENVIRONMENT --location $LOCATION \ variables -- --set ZONE \ $(gcloud compute instances list \ --format="value(zone)" --filter="name~'.*wordpress.*'")
Créez la variable
DISK
dans Airflow à partir du nom du disque persistant associé à l'instance WordPress :gcloud composer environments run $ENVIRONMENT --location $LOCATION \ variables -- --set DISK \ $(gcloud compute disks list \ --format="value(name)" --filter="name~'.*wordpress.*'")
Vérifiez que les variables Airflow ont été créées correctement :
Dans Cloud Console, accédez à la page Cloud Composer.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow. La page principale du serveur Web Airflow s’ouvre dans un nouvel onglet.
Cliquez sur Admin, puis sur Variables.
La liste affiche les variables de configuration du DAG.
Créer le graphe orienté acyclique
La définition du DAG réside dans un fichier Python dédié. La prochaine étape consiste à créer le DAG, en reliant les trois opérateurs du plug-in.
Dans Cloud Shell, ouvrez le fichier
backup_vm_instance.py
à l'aide d'un éditeur de texte tel que Nano ou Vim :vi $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py
Examinez les importations situées en haut du fichier :
Pour résumer ces importations :
DAG
correspond à la classe du graphe orienté acyclique défini par Airflow.DummyOperator
permet de créer les opérateurs "no-op" de début et de fin pour faciliter la visualisation du workflow. Dans les DAG plus complexes,DummyOperator
permet de joindre des branches et de créer ainsi des sous-DAG.- Le DAG utilise les trois opérateurs que vous avez définis dans les sections précédentes.
Définissez les valeurs des paramètres à transmettre aux constructeurs d'opérateurs :
Où :
INTERVAL
définit la fréquence d'exécution du workflow de sauvegarde. Le code précédent spécifie une récurrence quotidienne à l'aide d'un préréglage cron Airflow. Si vous souhaitez utiliser un intervalle différent, consultez la page de référence sur les exécutions de DAG. Vous pouvez également déclencher le workflow manuellement, indépendamment de la planification.START_DATE
définit le moment précis auquel le démarrage des sauvegardes est planifié. Vous n'avez pas besoin de le modifier.- Les autres valeurs sont extraites des variables Airflow que vous avez configurées dans la section précédente.
Utilisez le code suivant pour créer le DAG avec certains des paramètres définis précédemment. Ce code attribue également un nom et une description au DAG, lesquels s'affichent dans l'interface utilisateur de Cloud Composer.
Ajoutez les tâches au DAG, soit les instances d'opérateur :
Ce code instancie toutes les tâches nécessaires au workflow en transmettant les paramètres définis aux constructeurs d'opérateurs correspondants.
- Les valeurs
task_id
représentent les identifiants uniques qui seront affichés dans l'interface utilisateur de Cloud Composer. Vous les utiliserez plus tard pour transférer des données entre les tâches. retries
définit le nombre de tentatives d'exécution d'une tâche avant échec. Pour les tâchesDummyOperator
, ces valeurs sont ignorées.dag=dag
indique qu'une tâche est associée au DAG créé précédemment. Ce paramètre n'est requis que dans la première tâche du workflow.
- Les valeurs
Définissez la séquence des tâches qui composent le DAG du workflow :
Fermez le fichier
gce_commands_plugin.py
.
Exécuter le workflow
Le workflow représenté par l'opérateur DAG est maintenant prêt à être exécuté par Cloud Composer. Cloud Composer lit les définitions du DAG et du plug-in à partir d'un bucket Cloud Storage associé.
Ce bucket et les répertoires dags
et plugins
correspondants ont été automatiquement créés lors de la création de l'environnement Cloud Composer.
À l'aide de Cloud Shell, vous pouvez copier le DAG et le plug-in dans le bucket Cloud Storage associé :
Dans Cloud Shell, recherchez le nom du bucket :
BUCKET=$(gsutil ls) echo $BUCKET
Vous ne devez trouver qu'un seul bucket, nommé selon le format suivant :
gs://[REGION]-{ENVIRONMENT_NAME]-{ID}-bucket/.
Exécutez le script suivant pour copier les fichiers du DAG et du plug-in dans les répertoires de bucket correspondants :
gsutil cp $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins gsutil cp $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py "$BUCKET"dags
Le nom du bucket contient déjà une barre oblique, d'où les guillemets doubles autour de la variable
$BUCKET
.Dans Cloud Console, accédez à la page Cloud Composer.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow. La page principale du serveur Web Airflow s’ouvre dans un nouvel onglet. Attendez deux à trois minutes et actualisez-la. Il se peut que vous deviez patienter et actualiser la page à plusieurs reprises.
Une liste contenant le nouveau DAG s'affiche, semblable à ceci :
Si le code comporte des erreurs de syntaxe, un message apparaît en haut de la table du DAG. En cas d'erreurs d'exécution, celles-ci s'affichent dans la colonne DAG Runs (Exécutions DAG). Corrigez les erreurs avant de poursuivre. Pour ce faire, le moyen le plus simple consiste à copier à nouveau les fichiers dans le bucket à partir du dépôt GitHub.
Pour consulter une trace de la pile plus détaillée, exécutez la commande suivante dans Cloud Shell :
gcloud composer environments run $ENVIRONMENT --location $LOCATION list_dags
Airflow commence à exécuter le workflow immédiatement, comme indiqué dans la colonne DAG Runs (Exécutions DAG).
Le workflow est déjà en cours, mais si vous devez l'exécuter à nouveau, vous pouvez le déclencher manuellement en procédant comme suit :
- Dans la colonne Links (Liens), cliquez sur la première icône, Trigger Dag (Déclencher le DAG), indiquée par une flèche dans la capture d'écran précédente.
Dans la fenêtre pop-up de confirmation En êtes-vous sûr ?, cliquez sur OK.
Le workflow démarre après quelques secondes et une nouvelle exécution apparaît sous la forme d'un cercle vert clair dans la colonne DAG Runs (Exécutions DAG).
Dans la colonne Links (Liens), cliquez sur l'icône Graph View (Vue Graphe), indiquée par une flèche dans la capture d'écran précédente.
La Vue Graphe montre le workflow : les tâches dont l'exécution a réussi sont encadrées en vert foncé et la tâche en cours d'exécution, en vert clair. Quant aux tâches en attente, elles ne sont pas encadrées. Vous pouvez cliquer dessus pour afficher les journaux, consulter les détails et effectuer d'autres opérations.
Pour suivre l'avancement de l'exécution, cliquez de temps à autre sur le bouton d'actualisation situé en haut à droite.
Félicitations ! Vous avez terminé votre première exécution de workflow Cloud Composer. À la fin du processus, un instantané du disque persistant de l'instance Compute Engine est créé.
Dans Cloud Shell, vérifiez la présence de l'instantané :
gcloud compute snapshots list
Vous pouvez également utiliser le menu de Cloud Console pour accéder à la page Compute Engine Snapshots (Instantanés Compute Engine).
Un instantané devrait être visible à ce stade. D'autres instantanés seront créés lors des exécutions suivantes du workflow, déclenchées manuellement ou automatiquement selon le calendrier spécifié.
Les instantanés sont incrémentiels. Le premier instantané est le plus volumineux, car il contient tous les blocs du disque persistant sous forme compressée. Les instantanés suivants ne contiennent que les blocs qui ont été modifiés depuis l'instantané précédent, ainsi que les références aux blocs non modifiés. Ainsi, les instantanés suivants sont moins volumineux que le premier, leur production prend moins de temps, et ils coûtent moins cher.
Si un instantané est supprimé, ses données sont déplacées dans l'instantané correspondant suivant afin de conserver la cohérence des deltas consécutifs stockés dans la chaîne d'instantanés. Ce n'est que lorsque tous les instantanés sont supprimés que toutes les données sauvegardées du disque persistant sont supprimées.
Créer le capteur Airflow personnalisé
Pendant l’exécution du workflow, vous avez probablement remarqué que chaque étape prend un certain temps. Cette attente est due au fait que les opérateurs intègrent une instruction sleep()
à la fin pour laisser le temps à l'API Compute Engine de terminer son travail avant de lancer la tâche suivante.
Cependant, cette méthode n'est pas parfaite et peut entraîner des problèmes inattendus. Par exemple, lors de la création d’un instantané, le temps d’attente peut être trop long pour les instantanés incrémentiels, c'est-à-dire que vous perdez du temps à attendre une fin de tâche qui est déjà terminée. Ou bien le temps d'attente est trop court, ce qui peut mettre en échec l’ensemble du workflow ou produire des résultats peu fiables. En effet, l’instance n’est pas complètement arrêtée ou le processus d’instantané n’est pas terminé au moment du démarrage de la machine.
Vous devez pouvoir indiquer à la tâche suivante que la tâche précédente est terminée. Une solution consiste à utiliser des capteurs Airflow, qui interrompent le workflow jusqu'à ce que certains critères soient remplis. Dans ce cas, le critère indique que l'opération Compute Engine précédente s'est terminée avec succès.
Partager des données entre les tâches par communication croisée
Lorsque les tâches doivent communiquer entre elles, Airflow propose un mécanisme appelé XCom, ou "communication croisée". Cette fonctionnalité établit un échange d'informations entre les tâches sous la forme d'une clé, d'une valeur et d'un horodatage.
Le moyen le plus simple de transmettre un message XCom est de configurer l'opérateur pour qu'il renvoie une valeur à partir de sa méthode execute()
. La valeur peut être n'importe quel objet que Python peut sérialiser à l'aide du module pickle.
Les trois opérateurs décrits dans les sections précédentes appellent l'API Compute Engine. Tous ces appels d'API renvoient un objet Ressource d'opération. Ces objets sont destinés à être utilisés pour gérer les requêtes asynchrones telles que celles des opérateurs Airflow. Chaque objet comporte un champ name
permettant d'interroger le dernier état de l'opération Compute Engine.
Modifiez les opérateurs pour renvoyer le champ name
de l'objet "Ressource d'opération" en procédant comme suit :
Dans Cloud Shell, ouvrez le fichier
gce_commands_plugin.py
à l'aide d'un éditeur de texte tel que Nano ou Vim, cette fois à partir du répertoiresensor/plugins
:vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
Dans la méthode "execute" de l'opérateur
StopInstanceOperator
, notez comment le code ci-dessous :a été remplacé par celui-ci :
Où :
- La première ligne capture la valeur renvoyée de l'appel d'API dans la variable
operation
. - La deuxième ligne renvoie le champ
name
de l'opération à partir de la méthodeexecute()
. Cette instruction sérialise le nom à l'aide du module pickle et le transfère dans l'espace partagé XCom interne à la tâche. La valeur sera ensuite extraite dans l'ordre "dernier entré premier sorti".
Si une tâche doit transférer plusieurs valeurs, il est possible d'attribuer à XCom une valeur
key
explicite en appelant directement la méthodexcom_push()
au lieu de renvoyer la valeur.- La première ligne capture la valeur renvoyée de l'appel d'API dans la variable
De même, dans la méthode "execute" de l'opérateur
SnapshotDiskOperator
, notez comment le code ci-dessous :a été remplacé par celui-ci :
Ce code contient deux noms qui ne sont pas liés. Le premier correspond au nom de l'instantané et le second, au nom de l'opération.
Enfin, dans la méthode "execute" de l'opérateur
StartInstanceOperator
, notez comment le code ci-dessous :a été remplacé par celui-ci :
À ce stade, il ne devrait exister aucun appel à la méthode
sleep()
dans le fichiergce_commands_plugin.py
. Assurez-vous que c'est le cas en faisant une recherche sur la valeursleep
dans le fichier. Sinon, revérifiez les étapes précédentes de cette section.Comme aucun appel à la méthode
sleep()
n'a été effectué à partir du code, la ligne suivante a été supprimée de la section des importations en haut du fichier :import time
Fermez le fichier
gce_commands_plugin.py
.
Mettre en œuvre le capteur et le révéler
Dans la section précédente, vous avez modifié chaque opérateur pour qu'il renvoie un nom d'opération Compute Engine. Dans cette section, à partir du nom de l'opération, vous allez créer un capteur Airflow qui interroge l'API Compute Engine afin de savoir quand une opération est terminée.
Dans Cloud Shell, ouvrez le fichier
gce_commands_plugin.py
à l'aide d'un éditeur de texte tel que Nano ou Vim, en veillant à utiliser le répertoiresensor/plugins
:vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
Notez la ligne de code suivante en haut de la section d'importation, juste en dessous de la ligne
from airflow.models
import BaseOperator
:from airflow.operators.sensors import BaseSensorOperator
Tous les capteurs sont dérivés de la classe
BaseSensorOperator
et doivent remplacer sa méthodepoke()
.Examinez la nouvelle classe
OperationStatusSensor
:La classe
OperationStatusSensor
comporte les méthodes suivantes :__init__
: constructeur de la classe. Ce constructeur comporte des paramètres semblables à ceux des opérateurs, à l'exception deprior_task_id
. Ce paramètre correspond à l'ID de la tâche précédente.poke
: méthode principale du capteur redéfinie parBaseSensorOperator
. Airflow appelle cette méthode toutes les 60 secondes jusqu'à ce que la valeurTrue
soit renvoyée. Alors seulement, les tâches en aval sont autorisées à s'exécuter.Il est possible de configurer l'intervalle de ces tentatives en transmettant le paramètre
poke_interval
au constructeur. Vous pouvez également définir un paramètretimeout
. Consultez la documentation de référence de l'API BaseSensorOperator pour en savoir plus.Dans la mise en œuvre de la méthode
poke
précédente, la première ligne correspond à un appel àxcom_pull()
. Cette méthode obtient la valeur XCom la plus récente pour la tâche identifiée par le paramètreprior_task_id
. La valeur correspond au nom d'une opération Compute Engine et est stockée dans la variableoperation_name
.Le code exécute alors la méthode
zoneOperations.get()
en transmettant la variableoperation_name
en tant que paramètre pour obtenir le dernier état de l'opération. Si l'état est défini surDONE
, la méthodepoke()
renvoieTrue
, sinon elle renvoieFalse
. Dans le premier cas, les tâches en aval sont démarrées ; dans le second cas, l'exécution du workflow reste en pause et la méthodepoke()
est de nouveau appelée une fois le délai (en secondes) défini par la valeurpoke_interval
écoulé.
Au bas du fichier, notez que la classe
GoogleComputeEnginePlugin
mentionne à présent le capteurOperationStatusSensor
dans la liste des opérateurs exportés par le plug-in :Fermez le fichier
gce_commands_plugin.py
.
Mettre à jour le workflow
Une fois le capteur créé dans le plug-in, vous pouvez l'ajouter au workflow. Dans cette section, vous allez mettre à jour le workflow vers son état final, qui inclut les trois opérateurs, plus les tâches intermédiaires du capteur. Vous procéderez ensuite à l'exécution et à la vérification du workflow mis à jour.
Dans Cloud Shell, ouvrez le fichier
backup_vm_instance.py
à l'aide d'un éditeur de texte tel que Nano ou Vim, cette fois à partir du répertoiresensor/dags
:vi $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py
Dans la section des importations, notez que le nouveau capteur est importé sous la ligne
from airflow operators import StartInstanceOperator
:from airflow.operators import OperationStatusSensor
Examinez les lignes après le commentaire
## Wait tasks
.Le code réutilise
OperationStatusSensor
pour définir trois "tâches d'attente" intermédiaires. Chacune de ces tâches attend que l'opération précédente soit terminée. Les paramètres suivants sont transmis au constructeur de capteur :PROJECT
,ZONE
etINSTANCE
de l’instance WordPress, déjà définis dans le fichier.prior_task_id
: ID de la tâche attendue par le capteur. Par exemple, la tâchewait_for_stop
attend que la tâche dont l'ID eststop_instance
soit terminée.poke_interval
: nombre de secondes pendant lesquelles Airflow doit attendre entre deux tentatives d'appel de la méthodepoke()
du capteur. En d'autres termes, la fréquence de vérification de l'achèvement de la tâcheprior_task_id
.task_id
: ID de la tâche d'attente nouvellement créée.
Au bas du fichier, notez comment le code ci-dessous :
a été remplacé par celui-ci :
Ces lignes définissent le workflow de sauvegarde complet.
Fermez le fichier
backup_vm_instance.py
.
Vous devez maintenant copier le DAG et le plug-in dans le bucket Cloud Storage associé :
Dans Cloud Shell, recherchez le nom du bucket :
BUCKET=$(gsutil ls) echo $BUCKET
Vous ne devez trouver qu'un seul bucket, nommé selon le format suivant :
gs://[REGION]-[ENVIRONMENT_NAME]-[ID]-bucket/
.Exécutez le script suivant pour copier les fichiers du DAG et du plug-in dans les répertoires de bucket correspondants :
gsutil cp $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins gsutil cp $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py "$BUCKET"dags
Le nom du bucket contient déjà une barre oblique, d'où les guillemets doubles autour de la variable
$BUCKET
.Importez le workflow mis à jour dans Airflow :
Dans Cloud Console, accédez à la page Cloud Composer.
Dans la colonne Airflow, cliquez sur le lien Serveur Web Airflow pour afficher la page Airflow principale.
Attendez deux ou trois minutes que le programme mette à jour le plug-in et le workflow. Vous remarquerez probablement que la table DAG se vide momentanément. Actualisez la page plusieurs fois jusqu'à ce que la section Liens apparaisse de manière cohérente.
Assurez-vous qu'aucune erreur ne s'affiche et, dans la section Liens, cliquez sur Tree View (Arborescence).
À gauche, le workflow est représenté sous forme d'arborescence ascendante. À droite, un graphe présente les exécutions de la tâche à différentes dates. Un carré vert indique une exécution réussie pour la tâche et la date en question. Un carré blanc indique une tâche qui n'a pas encore été exécutée. Comme vous avez mis à jour le DAG avec de nouvelles tâches de capteur, toutes ces tâches sont indiquées en blanc, tandis que les tâches Compute Engine sont marquées en vert.
Exécutez le workflow de sauvegarde mis à jour :
- Dans le menu du haut, cliquez sur DAG pour revenir à la page principale.
- Dans la colonne Liens, cliquez sur Déclencher le DAG.
- Dans la fenêtre pop-up de confirmation En êtes-vous sûr ?, cliquez sur OK. Une exécution commence. Elle apparaît sous la forme d'un cercle vert clair dans la colonne Exécutions DAG.
Sous Liens, cliquez sur l'icône Graph View (Vue Graphe) pour observer l'exécution du workflow en temps réel.
Cliquez sur le bouton d'actualisation situé à droite pour suivre l'exécution de la tâche. Notez que le workflow s’arrête sur chacune des tâches du capteur pour attendre la fin de la tâche précédente. Le temps d'attente est ajusté en fonction des besoins de chaque tâche au lieu de reposer sur une valeur d'intervalle codée en dur.
Pendant l'exécution du workflow, vous pouvez également revenir à Cloud Console, sélectionner le menu Compute Engine, puis cliquer sur Instances de VM pour voir comment la machine virtuelle s'arrête et redémarre. Vous pouvez également cliquer sur Instantanés pour visualiser l'instantané en cours de création.
Vous avez maintenant exécuté un workflow de sauvegarde qui crée un instantané à partir d'une instance Compute Engine. Cet instantané s'inscrit dans les pratiques recommandées et optimise le flux grâce à l'utilisation de capteurs.
Restaurer une instance à partir d'un instantané
Disposer d'un instantané n'est qu'une partie de l'opération de sauvegarde. L'autre partie consiste à pouvoir restaurer votre instance à partir de cet instantané.
Pour créer une instance à partir d'un instantané, procédez comme suit :
Dans Cloud Shell, obtenez une liste des instantanés disponibles :
gcloud compute snapshots list
Le résultat est semblable à ceci :
NAME DISK_SIZE_GB SRC_DISK STATUS wordpress-1-vm-2018-07-18-120044 10 us-central1-c/disks/wordpress-1-vm READY wordpress-1-vm-2018-07-18-120749 10 us-central1-c/disks/wordpress-1-vm READY wordpress-1-vm-2018-07-18-125138 10 us-central1-c/disks/wordpress-1-vm READY
Sélectionnez un instantané et créez un disque persistant de démarrage autonome. Remplacez les espaces réservés entre crochets par vos propres valeurs.
gcloud compute disks create [DISK_NAME] --source-snapshot [SNAPSHOT_NAME] \ --zone=[ZONE]
Où :
DISK_NAME
est le nom du nouveau disque persistant de démarrage autonome.SNAPSHOT_NAME
est l'instantané sélectionné dans la première colonne du résultat précédent.ZONE
est la zone de calcul dans laquelle le disque sera créé.
Créez une instance à l'aide du disque de démarrage. Remplacez
[INSTANCE_NAME]
par le nom de l'instance que vous souhaitez créer.gcloud compute instances create [INSTANCE_NAME] --disk name=[DISK_NAME],boot=yes \ --zone=ZONE --tags=wordpress-1-tcp-443,wordpress-1-tcp-80
Avec les deux tags spécifiés dans la commande, l'instance est automatiquement autorisée à recevoir le trafic entrant sur les ports 443 et 80 en raison des règles de pare-feu préexistantes créées pour l'instance WordPress initiale.
Prenez note de l'adresse IP externe de la nouvelle instance renvoyée par la commande précédente.
Vérifiez que WordPress est en cours d'exécution sur l'instance créée. Insérez l'adresse IP externe dans un nouvel onglet de navigateur. La page de destination par défaut de WordPress s'affiche.
Vous pouvez également créer une instance à l'aide d'un instantané dans la console :
Dans Cloud Console, accédez à la page "Instantanés" :
Cliquez sur l'instantané le plus récent.
Cliquez sur Créer une instance.
Dans le formulaire Nouvelle instance de VM, cliquez sur Gestion, sécurité, disques, mise en réseau, location unique, puis sur Réseau.
Ajoutez
wordpress-1-tcp-443
etwordpress-1-tcp-80
dans le champ Tags réseau en appuyant sur Entrée après chaque tag. Consultez les informations ci-dessus pour obtenir une explication de ces tags.Cliquez sur Créer.
Une nouvelle instance basée sur le dernier instantané est créée et prête à diffuser du contenu.
Ouvrez la page Instances Compute Engine et notez l'adresse IP externe de la nouvelle instance.
Vérifiez que WordPress est en cours d'exécution sur l'instance créée. Accédez à l'adresse IP externe à partir d'un nouvel onglet de navigateur.
Pour en savoir plus, reportez-vous à la section Créer une instance à partir d'un instantané.
Effectuer un nettoyage
- Dans Cloud Console, accédez à la page Gérer les ressources.
- Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.
Étape suivante
- Prenez connaissance des bonnes pratiques pour les entreprises.
- Découvrez comment concevoir et mettre en œuvre un plan de reprise après sinistre.
- Découvrez les concepts de Cloud Composer.
- Apprenez-en plus sur Apache Airflow.
- Testez d'autres fonctionnalités de Google Cloud. Découvrez nos tutoriels.