Infrastruktur mit Cloud Composer automatisieren

In dieser Anleitung wird eine Möglichkeit zur Automatisierung der Cloudinfrastruktur mit Cloud Composer beschrieben. Das Beispiel zeigt, wie Sie automatisierte Sicherungen von Compute Engine-VM-Instanzen planen.

Cloud Composer ist ein vollständig verwalteter Dienst zur Workflow-Orchestrierung in Google Cloud. Mit Cloud Composer können Sie Workflows mit einer Python API erstellen, ihre automatische Ausführung planen oder sie manuell starten. Die Ausführung der Aufgaben können Sie in Echtzeit über eine grafische UI überwachen.

Cloud Composer basiert auf Apache Airflow. Grundlage der von Google betriebenen Open Source-Orchestrierungsplattform ist ein Cluster in Google Kubernetes Engine (GKE). Dieser Cluster verwaltet die Airflow-Worker und bietet zahlreiche Möglichkeiten für die Einbindung anderer Google Cloud-Produkte.

Diese Anleitung richtet sich an Betreiber, IT-Administratoren und Entwickler, die ihre Infrastruktur automatisieren und einen tiefen Einblick in die Kernfunktionen von Cloud Composer erhalten möchten. Die Anleitung ist nicht als Leitfaden für die Notfallwiederherstellung (Disaster Recovery, DR) auf Unternehmensebene oder als Best Practices-Leitfaden für Sicherungen gedacht. Informationen zum Erstellen eines DR-Plans für Ihr Unternehmen finden Sie im Leitfaden zur Planung der Notfallwiederherstellung.

Architektur definieren

Zum Definieren von Cloud Composer-Workflows wird ein gerichteter azyklischer Graph (Directed Acyclic Graph, DAG) erstellt. In Airflow ist ein DAG eine Sammlung von Aufgaben, deren Anordnung die gerichteten Beziehungen widerspiegelt. In dieser Anleitung erfahren Sie, wie Sie einen Airflow-Workflow definieren, mit dem eine VM-Instanz von Compute Engine in regelmäßigen Abständen mithilfe von Snapshots des nichtflüchtigen Speichers gesichert wird.

Die in diesem Beispiel verwendete Compute Engine-VM ist eine Instanz mit einem verknüpften nichtflüchtigen Bootspeicher. Der Cloud Composer-Sicherungsworkflow ruft die Compute Engine API gemäß den später beschriebenen Snapshot-Richtlinien auf, um die Instanz zu stoppen, einen Snapshot des nichtflüchtigen Speichers zu erstellen und die Instanz neu zu starten. Zwischen diesen Aufgaben wartet der Workflow vor dem Fortfahren, bis jeder Vorgang abgeschlossen ist.

Im folgenden Diagramm erhalten Sie einen Überblick der Architektur:

Architektur zur Automatisierung der Infrastruktur

Bevor Sie mit der Anleitung beginnen, sehen Sie im nächsten Abschnitt, wie Sie eine Cloud Composer-Umgebung erstellen. Der Vorteil dieser Umgebung besteht darin, dass mehrere Google Cloud-Produkte verwendet werden, Sie aber nicht jedes einzeln konfigurieren müssen.

  • Cloud Storage: Der Airflow-DAG, das Plug-in und die Logs werden in einem Cloud Storage-Bucket gespeichert.
  • Google Kubernetes Engine: Die Airflow-Plattform basiert auf einer Mikrodienstarchitektur und kann in GKE ausgeführt werden.
  • Airflow-Worker laden Plug-in- und Workflowdefinitionen aus Cloud Storage und führen jede Aufgabe über die Compute Engine API aus.
  • Der Airflow-Scheduler stellt sicher, dass Sicherungen im konfigurierten Intervall und in der richtigen Reihenfolge ausgeführt werden.
  • Redis wird als Message Broker zwischen Airflow-Komponenten verwendet.
  • Cloud SQL Proxy wird für die Kommunikation mit dem Metadaten-Repository verwendet.
  • Cloud SQL und App Engine Flex: Cloud Composer verwendet außerdem eine Cloud SQL-Instanz für Metadaten und eine App Engine Flex-Anwendung, die die Airflow-UI bedient. Diese Ressourcen sind im Diagramm nicht dargestellt, da sie in einem separaten von Google verwalteten Projekt ausgeführt werden.
  • Weitere Informationen finden Sie in der Übersicht über Cloud Composer (nur auf Englisch verfügbar).

    Workflow skalieren

    In dieser Anleitung wird ein einfacher Anwendungsfall vorgestellt: Das Erstellen eines Snapshots einer einzelnen virtuellen Maschine mit einem festen Zeitplan. Ein reales Szenario kann jedoch Hunderte von VMs unterschiedlicher Organisationsbereiche oder verschiedene Systemebenen umfassen, die unterschiedliche Sicherungszeitpläne erfordern. Die Skalierung gilt nicht nur für unser Beispiel mit Compute Engine-VMs, sondern auch für jede Infrastrukturkomponente, für die ein geplanter Prozess ausgeführt werden muss.

    Cloud Composer eignet sich hervorragend für diese komplexen Szenarien, da es eine vollwertige Workflow-Engine ist, die auf Apache Airflow basiert und in der Cloud gehostet wird, und nicht nur eine Alternative zu Cloud Scheduler oder cron.

    Airflow-DAGs sind flexible Darstellungen eines Workflows und passen sich den Erfordernissen der realen Umgebung an, obwohl sie weiterhin von einer einzigen Codebasis aus ausgeführt werden. Zum Erstellen geeigneter DAGs für Ihren Anwendungsfall können Sie die beiden folgenden Ansätze kombinieren:

    • Sie erstellen eine DAG-Instanz für Gruppen von Infrastrukturkomponenten, bei denen der Prozess mit demselben Zeitplan gestartet werden kann.
    • Sie erstellen unabhängige DAG-Instanzen für Gruppen von Infrastrukturkomponenten, die eigene Zeitpläne erfordern.

    Ein DAG kann Komponenten parallel verarbeiten. Dabei muss entweder mit einer Aufgabe für jede Komponente ein asynchroner Vorgang gestartet werden oder Sie erstellen für die Verarbeitung jeder Komponente einen Branch. Sie können DAGs dynamisch aus Code erstellen, um Branches und Aufgaben nach Bedarf hinzuzufügen oder zu entfernen.

    Sie können auch Abhängigkeiten zwischen Anwendungsebenen innerhalb desselben DAG modellieren. Dies ist beispielsweise nützlich, wenn Sie vor dem Stoppen von Anwendungsserverinstanzen alle Webserverinstanzen anhalten möchten.

    Diese Optimierungen werden in der vorliegenden Anleitung nicht erläutert.

    Best Practices für Snapshots verwenden

    Nichtflüchtiger Speicher ist ein dauerhafter Blockspeicher, der an eine VM-Instanz angehängt und als primäres Bootlaufwerk für die Instanz oder als sekundäres Nicht-Bootlaufwerk für kritische Daten verwendet werden kann. Nichtflüchtige Speicher sind hochverfügbar – Bei jedem Schreibvorgang werden drei Replikate geschrieben, von denen Google Cloud-Kunden jedoch nur ein Replikat berechnet wird.

    Ein Snapshot ist eine exakte Kopie eines nichtflüchtigen Speichers zu einer bestimmten Zeit. Snapshots sind inkrementell und komprimiert und werden transparent im Cloud Storage gespeichert.

    Snapshots können bei laufenden Anwendungen von jedem nichtflüchtigen Speicher erstellt werden. Sie enthalten niemals einen unvollständigen Block. Wenn jedoch gerade ein Schreibvorgang für mehrere Blöcke ausgeführt wird, während auf dem Back-End eine Anfrage zur Erstellung eines Snapshots eingeht, enthält der Snapshot möglicherweise nur einen Teil der aktualisierten Blöcke. Sie können mit diesen Inkonsistenzen genauso wie bei einem nicht ordnungsgemäßen Herunterfahren verfahren.

    Für konsistente Snapshots wird empfohlen, diese Richtlinien zu befolgen:

    • Minimieren oder vermeiden Sie während der Snapshot-Erstellung Schreibvorgänge auf dem Laufwerk. Planen Sie Sicherungen beispielsweise nach Möglichkeit außerhalb von Spitzenzeiten.
    • Halten Sie bei sekundären Nicht-Bootlaufwerken Anwendungen und Prozesse an, die Daten schreiben und das Dateisystem einfrieren oder dessen Bereitstellung aufheben.
    • Bei Bootlaufwerken ist es weder sicher noch zulässig, das Root-Volume einzufrieren. Sie können aber die VM-Instanz anhalten, bevor Sie einen Snapshot erstellen.

      Verwenden Sie eine hochverfügbare Architektur, um Dienstausfälle durch Einfrieren oder Anhalten einer virtuellen Maschine zu vermeiden. Weitere Informationen finden Sie unter Szenarien der Notfallwiederherstellung von Anwendungen.

    • Verwenden Sie eine konsistente Namenskonvention für die Snapshots. Sie können beispielsweise einen Zeitstempel mit einer entsprechenden Genauigkeit verwenden, der mit dem Namen der Instanz, des Laufwerks und der Zone verkettet ist.

    Weitere Informationen zum Erstellen konsistenter Snapshots finden Sie in diesem Artikel (nur auf Englisch verfügbar).

    Ziele

    • Benutzerdefinierte Airflow-Operatoren und einen Sensor für Compute Engine erstellen
    • Einen Cloud Composer-Workflow mit den Airflow-Operatoren und einem Sensor erstellen
    • Den Workflow so planen, dass in regelmäßigen Intervallen eine Compute Engine-Instanz gesichert wird

    Kosten

    Mit dem Preisrechner können Sie eine Kostenschätzung für die geplante Nutzung vornehmen.

    Vorbereitung

    1. Melden Sie sich bei Ihrem Google-Konto an.

      Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

    2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

      Zur Projektauswahl

    3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

    4. Erstellen Sie eine Cloud Composer-Umgebung. Wählen Sie zur Minimierung der Kosten eine Laufwerkgröße von 20 GB aus.

      Zur Seite "Umgebungen erstellen"

      Das Bereitstellen der Cloud Composer-Umgebung erfolgt normalerweise innerhalb von etwa 15 Minuten, kann jedoch bis zu eine Stunde dauern.

    5. Der vollständige Code für diese Anleitung ist auf GitHub verfügbar. Damit Sie die Dateien im weiteren Verlauf prüfen können, öffnen Sie das Repository in Cloud Shell:

      Zu Cloud Shell
    6. Führen Sie im Basisverzeichnis der Cloud Shell-Konsole den folgenden Befehl aus:
      git clone https://github.com/GoogleCloudPlatform/composer-infra-python.git

    Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

    Compute Engine-Beispielinstanz einrichten

    Erstellen Sie zuerst die zu sichernde VM-Beispielinstanz von Compute Engine. Auf dieser Instanz wird das Open-Source-Content-Management-System WordPress ausgeführt.

    Führen Sie zum Erstellen der WordPress-Instanz in Compute Engine die folgenden Schritte aus:

    1. Rufen Sie im Google Cloud Marketplace die Startseite für das von Bitnami zertifizierte WordPress auf.
    2. Klicken Sie auf Starten.
    3. Ein Pop-up-Fenster mit einer Liste Ihrer Projekte wird angezeigt. Wählen Sie das zuvor für diese Anleitung erstellte Projekt aus.

      Google Cloud konfiguriert die erforderlichen APIs in Ihrem Projekt und zeigt nach kurzer Wartezeit eine Ansicht mit den verschiedenen Konfigurationsoptionen für Ihre WordPress Compute Engine-Instanz.

    4. Sie können den Typ des Bootlaufwerks in SSD ändern, um die Startgeschwindigkeit der Instanz zu erhöhen.

    5. Klicken Sie auf Deploy (Bereitstellen).

      Daraufhin wird der Bildschirm "Deployment Manager" mit dem Status der Bereitstellung angezeigt.

      Durch das WordPress Deployment Manager-Skript werden die WordPress Compute Engine-Instanz und zwei Firewallregeln erstellt, mit denen die Instanz über die Ports 80 und 443 für TCP-Traffic erreichbar ist. Dieser Vorgang kann einige Minuten dauern. Der Fortschritt der Bereitstellung jedes Elements wird durch ein Radsymbol angezeigt.

      Nach Abschluss des Vorgangs stellt die WordPress-Instanz den Standardinhalt auf der Website-URL bereit. Auf dem Bildschirm "Deployment Manager" werden im Feld Site address die URL der Website und im Feld Admin URL die URL der Administrationskonsole einschließlich Nutzer, Passwort, Dokumentationslinks und Vorschlägen für nächste Schritte angezeigt.

      Deployment Manager mit bereitgestellter Instanz

    6. Klicken Sie auf die Webadresse, um zu prüfen, ob die WordPress-Instanz betriebsbereit ist. Sie sollten eine Standard-WordPress-Blogseite sehen.

    Die Compute Engine-Beispielinstanz ist jetzt bereit. Als Nächstes konfigurieren Sie einen automatischen inkrementellen Sicherungsprozess für den nichtflüchtigen Speicher der Instanz.

    Benutzerdefinierte Airflow-Operatoren erstellen

    Zum Sichern des nichtflüchtigen Speichers der Testinstanz können Sie einen Airflow-Workflow erstellen, mit dem die Instanz gestoppt, ein Snapshot des nichtflüchtigen Speichers erstellt und die Instanz neu gestartet wird. Jede dieser Aufgaben wird als Code mit einem benutzerdefinierten Airflow-Operator definiert. Der Code der Operatoren wird dann in einem Airflow-Plug-in zusammengefasst.

    In diesem Abschnitt erfahren Sie, wie Sie benutzerdefinierte Airflow-Operatoren erstellen, die zur Steuerung des Instanzlebenszyklus die Compute Engine Python-Clientbibliothek aufrufen. Dies ist auch auf andere Arten möglich:

    • Verwenden Sie den BashOperator von Airflow, um gcloud compute-Befehle auszuführen.
    • Verwenden Sie den HTTPOperator von Airflow, um direkte HTTP-Aufrufe an die Compute Engine REST API auszuführen.
    • Verwenden Sie den PythonOperator von Airflow, um beliebige Python-Funktionen aufzurufen, ohne benutzerdefinierte Operatoren zu definieren.

    Diese Alternativen werden in der vorliegenden Anleitung nicht erläutert.

    Aufrufe an die Compute Engine API autorisieren

    Die benutzerdefinierten Operatoren, die Sie in dieser Anleitung erstellen, verwenden für Aufrufe der Compute Engine API die Python-Clientbibliothek. Anfragen an die API müssen authentifiziert und autorisiert werden. Hierfür empfiehlt sich die Verwendung von Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC)

    Die ADC-Strategie wird bei jedem Aufruf von einer Clientbibliothek aus angewendet:

    1. Die Bibliothek überprüft, ob in der Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS ein Dienstkonto angegeben ist.
    2. Wenn kein Dienstkonto angegeben ist, verwendet die Bibliothek das von Compute Engine oder GKE bereitgestellte Standarddienstkonto.

    Wenn diese beiden Methoden fehlschlagen, tritt ein Fehler auf.

    Für die Airflow-Operatoren in dieser Anleitung findet die zweite Methode Anwendung. Wenn Sie die Cloud Composer-Umgebung erstellen, wird ein GKE-Cluster bereitgestellt. Auf den Knoten dieses Clusters werden Airflow-Worker-Pods ausgeführt. Diese Worker wiederum führen den Workflow mit den von Ihnen definierten benutzerdefinierten Operatoren aus. Da Sie beim Erstellen der Umgebung kein Dienstkonto angegeben haben, wird für die ADC-Strategie das Standarddienstkonto für die GKE-Clusterknoten verwendet.

    GKE-Clusterknoten sind Compute Engine-Instanzen. Daher ist es einfach, die dem Compute Engine-Standarddienstkonto zugeordneten Anmeldedaten im Operatorcode abzurufen.

    def get_compute_api_client(self):
      credentials = GoogleCredentials.get_application_default()
      return googleapiclient.discovery.build(
          'compute', 'v1', cache_discovery=False, credentials=credentials)

    Für diesen Code werden die Standardanmeldedaten der Anwendung verwendet, um einen Python-Client zu erstellen, der Anfragen an die Compute Engine API sendet. In den folgenden Abschnitten referenzieren Sie diesen Code bei der Erstellung jedes Airflow-Operators.

    Alternativ zur Verwendung des Compute Engine-Standarddienstkontos können Sie ein Dienstkonto erstellen und in der Airflow-Verwaltungskonsole als Verbindung konfigurieren. Diese Methode wird auf der Seite Airflow-Verbindungen verwalten beschrieben und ermöglicht eine detailliertere Zugriffssteuerung auf Google Cloud-Ressourcen. Diese Alternative wird in der vorliegenden Anleitung nicht erläutert.

    Compute Engine-Instanz sicher herunterfahren

    In diesem Abschnitt wird erläutert, wie Sie den ersten benutzerdefinierten Airflow-Operator StopInstanceOperator erstellen. Mit diesem Operator rufen Sie die Compute Engine API auf, um die Compute Engine-Instanz zu stoppen, auf der WordPress ausgeführt wird:

    1. Verwenden Sie in Cloud Shell einen Texteditor wie Nano oder Vim, um die Datei gce_commands_plugin.py zu öffnen:

      vi $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py
      
    2. Prüfen Sie die am Anfang der Datei aufgeführten Importvorgänge:

      import datetime
      import logging
      import time
      from airflow.models import BaseOperator
      from airflow.plugins_manager import AirflowPlugin
      from airflow.utils.decorators import apply_defaults
      import googleapiclient.discovery
      from oauth2client.client import GoogleCredentials

      Interessant sind folgende Importvorgänge:

      • BaseOperator: Basisklasse, die von allen benutzerdefinierten Airflow-Operatoren übernommen werden muss.
      • AirflowPlugin: Basisklasse zum Erstellen einer Gruppe von Operatoren, die ein Plug-in bilden.
      • apply_defaults: Funktions-Decorator, der Argumente mit Standardwerten füllt, wenn sie nicht im Operator-Konstruktor angegeben sind.
      • GoogleCredentials: Klasse, die zum Abrufen der Standardanmeldedaten für Anwendungen verwendet wird.
      • googleapiclient.discovery: Einstiegspunkt der Clientbibliothek, der die Erkennung der zugrunde liegenden Google APIs ermöglicht. In diesem Fall erstellt die Clientbibliothek eine Ressource für die Interaktion mit der Compute Engine API.
    3. Sehen Sie sich als Nächstes die Klasse StopInstanceOperator unter den Importen an:

      class StopInstanceOperator(BaseOperator):
        """Stops the virtual machine instance."""
      
        @apply_defaults
        def __init__(self, project, zone, instance, *args, **kwargs):
          self.compute = self.get_compute_api_client()
          self.project = project
          self.zone = zone
          self.instance = instance
          super(StopInstanceOperator, self).__init__(*args, **kwargs)
      
        def get_compute_api_client(self):
          credentials = GoogleCredentials.get_application_default()
          return googleapiclient.discovery.build(
              'compute', 'v1', cache_discovery=False, credentials=credentials)
      
        def execute(self, context):
          logging.info('Stopping instance %s in project %s and zone %s',
                       self.instance, self.project, self.zone)
          self.compute.instances().stop(
              project=self.project, zone=self.zone, instance=self.instance).execute()
          time.sleep(90)

      Die Klasse StopInstanceOperator hat drei Methoden:

      • __init__: Klassenkonstruktor. Dieser empfängt den Projektnamen, die Zone, in der die Instanz ausgeführt wird, und den Namen der Instanz, die Sie stoppen möchten. Außerdem wird durch Aufrufen von get_compute_api_client die Variable self.compute initialisiert.
      • get_compute_api_client: Hilfsmethode, die eine Instanz der Compute Engine API zurückgibt. Für die Authentifizierung bei der API und die Autorisierung nachfolgender Aufrufe werden die von GoogleCredentials bereitgestellten Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC) verwendet.
      • execute: Hauptoperatormethode, überschrieben von BaseOperator. In Airflow wird mit dieser Methode der Operator ausgeführt. Dabei wird in die Logs eine Infonachricht geschrieben. Anschließend wird die Compute Engine API aufgerufen, um die Compute Engine-Instanz zu stoppen, die mit den drei im Konstruktor empfangenen Parametern angegeben wurde. Die abschließende Funktion sleep() wird ausgeführt, nachdem die Instanz gestoppt wurde. In einer Produktionsumgebung müssen Sie eine deterministischere Methode wie etwa eine Querkommunikation zwischen Operatoren verwenden. Diese Technik wird später in der Anleitung beschrieben.

    Mit der Methode stop() der Compute Engine API wird die VM-Instanz ordnungsgemäß heruntergefahren. Das Betriebssystem führt die Skripts init.d zum Herunterfahren aus. Dies gilt auch für das WordPress-Skript unter /etc/init.d/bitnami. Über dieses Skript erfolgt auch der Start von WordPress, wenn die virtuelle Maschine wieder gestartet wird. Sie können die Dienstdefinition mit der Konfiguration zum Herunterfahren und Starten unter /etc/systemd/system/bitnami.service. prüfen.

    Eindeutig benannte inkrementelle Sicherungs-Snapshots erstellen

    In diesem Abschnitt wird der zweite benutzerdefinierte Operator SnapshotDiskOperator erstellt. Dieser Operator erstellt einen Snapshot des nichtflüchtigen Speichers der Instanz.

    Sehen Sie sich in der Datei gce_commands_plugin.py, die Sie im vorherigen Abschnitt geöffnet haben, die Klasse SnapshotDiskOperator an:

    class SnapshotDiskOperator(BaseOperator):
      """Takes a snapshot of a persistent disk."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, disk, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        self.disk = disk
        super(SnapshotDiskOperator, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def generate_snapshot_name(self, instance):
        # Snapshot name must match regex '(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'
        return ('' + self.instance + '-' +
                datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))
    
      def execute(self, context):
        snapshot_name = self.generate_snapshot_name(self.instance)
        logging.info(
            ("Creating snapshot '%s' from: {disk=%s, instance=%s, project=%s, "
             "zone=%s}"),
            snapshot_name, self.disk, self.instance, self.project, self.zone)
        self.compute.disks().createSnapshot(
            project=self.project, zone=self.zone, disk=self.disk,
            body={'name': snapshot_name}).execute()
        time.sleep(120)

    Die Klasse SnapshotDiskOperator hat folgende Methoden:

    • __init__: Klassenkonstruktor. Dieser Konstruktor ähnelt dem in der Klasse StopInstanceOperator, erhält jedoch zusätzlich zu den Projekt-, Zonen- und Instanznamen auch den Namen des Laufwerks, von dem der Snapshot erstellt wird. Dies liegt daran, dass an eine Instanz mehrere nichtflüchtige Speicher angehängt sein können.
    • generate_snapshot_name: Mit dieser Beispielmethode wird für jeden Snapshot ein einfacher einmaliger Name erstellt, der aus dem Namen der Instanz, dem Datum und der Uhrzeit mit einer Genauigkeit von einer Sekunde gebildet wird. Passen Sie den Namen Ihren Anforderungen entsprechend an. Sie können beispielsweise den Namen des Laufwerks angeben, wenn mehrere Laufwerke an eine Instanz angehängt sind, oder die Genauigkeit der Uhrzeit erhöhen, um Ad-hoc-Anfragen zur Snapshot-Erstellung zu unterstützen.
    • execute: Die von BaseOperator überschriebene Hauptoperatormethode. Bei Ausführung durch den Airflow-Worker wird mit der Methode generate_snapshot_name ein Snapshot-Name generiert. Anschließend wird eine Infonachricht protokolliert. Dann wird die Compute Engine API aufgerufen, um den Snapshot mit den im Konstruktor empfangenen Parametern zu erstellen.

    Compute Engine-Instanz starten

    In diesem Abschnitt erstellen Sie den dritten und letzten benutzerdefinierten Operator StartInstanceOperator. Mit diesem Operator wird eine Compute Engine-Instanz neu gestartet.

    Sehen Sie sich im unteren Bereich der zuvor geöffneten Datei gce_commands_plugin.py die Klasse SnapshotDiskOperator an:

    class StartInstanceOperator(BaseOperator):
      """Starts a virtual machine instance."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        super(StartInstanceOperator, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def execute(self, context):
        logging.info('Starting instance %s in project %s and zone %s',
                     self.instance, self.project, self.zone)
        self.compute.instances().start(
            project=self.project, zone=self.zone, instance=self.instance).execute()
        time.sleep(20)

    Die Klasse StartInstanceOperator hat folgende Methoden:

    • __init__: Klassenkonstruktor. Ähnlich wie der Konstruktor in der Klasse StopInstanceOperator.
    • execute: Die von BaseOperator überschriebene Hauptoperatormethode. Der Unterschied zu den vorherigen Operatoren besteht im Aufrufen der entsprechenden Compute Engine API, um die mit den Parametern im Konstruktor angegebene Instanz zu starten.

    Airflow-Workflow definieren

    Sie haben zuvor ein Airflow-Plug-in mit drei Operatoren definiert. Diese Operatoren legen die Aufgaben fest, die Bestandteil eines Airflow-Workflows sind. Der hier dargestellte Workflow ist einfach und linear. Airflow-Workflows können jedoch komplexe gerichtete azyklische Graphen (Directed Acyclic Graphs, DAGs) sein.

    In diesem Abschnitt wird die Plug-in-Klasse erstellt, mit der die drei Operatoren verfügbar werden. Anschließend wird der DAG mithilfe der Operatoren erstellt, für Cloud Composer bereitgestellt und ausgeführt.

    Plug-in erstellen

    Bisher enthält die Datei gce_commands_plugin.py die Operatoren "start", "snapshot" und "stop". Damit Sie diese Operatoren in einem Workflow verwenden können, müssen Sie sie in eine Plug-in-Klasse aufnehmen.

    1. Beachten Sie die Klasse GoogleComputeEnginePlugin unten in der Datei gce_commands_plugin.py:

      class GoogleComputeEnginePlugin(AirflowPlugin):
        """Expose Airflow operators."""
      
        name = 'gce_commands_plugin'
        operators = [StopInstanceOperator, SnapshotDiskOperator,
                     StartInstanceOperator]

      Diese Klasse, die von AirflowPlugin übernommen wird, gibt dem Plug-in den internen Namen gce_commands_plugin und fügt die drei Operatoren hinzu.

    2. Schließen Sie die Datei gce_commands_plugin.py.

    Gerichteten azyklischen Graphen konfigurieren

    Der DAG definiert den von Airflow ausgeführten Workflow. Damit der DAG weiß, welches Laufwerk gesichert werden soll, definieren Sie, an welche Compute Engine-Instanz das Laufwerk angehängt ist, in welcher Zone die Instanz ausgeführt wird und in welchem Projekt alle Ressourcen verfügbar sind.

    Sie haben die Möglichkeit, die Variablen im DAG-Quellcode hartzucodieren. Es empfiehlt sich jedoch, sie als Airflow-Variablen zu definieren. Dadurch können Sie Konfigurationsänderungen zentral und von Codebereitstellungen unabhängig verwalten.

    Definieren Sie die DAG-Konfiguration:

    1. Legen Sie in Cloud Shell den Ort der Cloud Composer-Umgebung fest:

      LOCATION=[CLOUD_ENV_LOCATION]
      

      Der Speicherort ist die Compute Engine-Region, in der sich die Cloud Composer-Umgebung befindet, z. B. us-central1 oder europe-west1. Diese wurde bei der Erstellung der Umgebung festgelegt und ist auf der Seite der Cloud Composer-Konsole angegeben.

    2. Legen Sie den Namen der Cloud Composer-Umgebung fest:

      ENVIRONMENT=$(gcloud composer environments list \
          --format="value(name)" --locations $LOCATION)
      

      Mit dem Parameter --format wird nur die Spalte name aus der resultierenden Tabelle ausgewählt. Sie können davon ausgehen, dass nur eine Umgebung erstellt wurde.

    3. Erstellen Sie in Airflow die Variable PROJECT mit dem Namen des aktuellen Google Cloud-Projekts:

      gcloud composer environments run $ENVIRONMENT --location $LOCATION \
          variables -- --set PROJECT $(gcloud config get-value project)
      

      Dabei gilt:

      • gcloud composer environments run wird zum Ausführen von Airflow-Befehlszeilenbefehlen verwendet.
      • Der Airflow-Befehl variables setzt die Airflow-Variable PROJECT auf den von gcloud config zurückgegebenen Wert.
    4. Erstellen Sie in Airflow die Variable INSTANCE mit dem Namen der WordPress-Instanz:

      gcloud composer environments run $ENVIRONMENT --location $LOCATION \
          variables -- --set INSTANCE \
          $(gcloud compute instances list \
          --format="value(name)" --filter="name~'.*wordpress.*'")
      

      Dieser Befehl verwendet den Parameter --filter, um nur die Instanz auszuwählen, deren name mit einem regulären Ausdruck übereinstimmt, der den String wordpress enthält. Bei diesem Ansatz wird davon ausgegangen, dass es nur eine solche Instanz gibt und dass der Name der Instanz und des Laufwerks "wordpress" enthält. Dies ist der Fall, wenn Sie die Standardeinstellungen übernommen haben.

    5. Erstellen Sie in Airflow die Variable ZONE mit der Zone der WordPress-Instanz:

      gcloud composer environments run $ENVIRONMENT --location $LOCATION \
          variables -- --set ZONE \
          $(gcloud compute instances list \
          --format="value(zone)" --filter="name~'.*wordpress.*'")
      
    6. Erstellen Sie in Airflow die Variable DISK mit dem Namen des nichtflüchtigen Speichers, der an die WordPress-Instanz angehängt ist:

      gcloud composer environments run $ENVIRONMENT --location $LOCATION \
          variables -- --set DISK \
          $(gcloud compute disks list \
          --format="value(name)" --filter="name~'.*wordpress.*'")
      
    7. Prüfen Sie, ob die Airflow-Variablen ordnungsgemäß erstellt wurden:

      1. Rufen Sie in der Cloud Console die Seite "Cloud Composer" auf.

        Zur Seite "Cloud Composer"

      2. Klicken Sie in der Spalte Airflow web server auf den Link Airflow. Daraufhin wird ein neuer Tab mit der Startseite des Airflow-Webservers geöffnet.

      3. Klicken Sie auf Admin und dann auf Variables (Variablen).

        Die Liste zeigt die DAG-Konfigurationsvariablen.

        DAG-Konfigurationsvariablen

    Gerichteten azyklischen Graphen erstellen

    Die DAG-Definition ist in einer dedizierten Python-Datei angegeben. Erstellen Sie als Nächstes den DAG. Verketten Sie zu diesem Zweck die drei Operatoren des Plug-ins.

    1. Verwenden Sie in Cloud Shell einen Texteditor wie Nano oder Vim, um die Datei backup_vm_instance.py zu öffnen:

      vi $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py
      
    2. Prüfen Sie die am Anfang der Datei aufgeführten Importvorgänge:

      import datetime
      from airflow import DAG
      from airflow.models import Variable
      from airflow.operators import SnapshotDiskOperator
      from airflow.operators import StartInstanceOperator
      from airflow.operators import StopInstanceOperator
      from airflow.operators.dummy_operator import DummyOperator

      Zusammenfassung der Importvorgänge:

      • DAG ist der in Airflow definierte gerichtete azyklische Graph (Directed Acyclic Graph – DAG).
      • DummyOperator wird verwendet, um die No-Op-Operatoren für den Anfang und das Ende zu erstellen, wodurch die Workflow-Visualisierung verbessert wird. In komplexeren DAGs kann DummyOperator verwendet werden, um Branches zu verbinden und SubDAGs zu erstellen.
      • Der DAG verwendet die drei Operatoren, die Sie in den vorherigen Abschnitten definiert haben.
    3. Definieren Sie die Werte der Parameter, die an Operatorkonstruktoren übergeben werden sollen:

      INTERVAL = '@daily'
      START_DATE = datetime.datetime(2018, 7, 16)
      PROJECT = Variable.get('PROJECT')
      ZONE = Variable.get('ZONE')
      INSTANCE = Variable.get('INSTANCE')
      DISK = Variable.get('DISK')

      Dabei gilt:

      • INTERVAL legt fest, wie oft der Sicherungsworkflow ausgeführt wird. Der vorhergehende Code gibt eine tägliche Wiederholung mit einem vorkonfigurierten Airflow-Cronjob an. Wie Sie das Intervall ändern können, lesen Sie auf der Referenzseite für DAG-Ausführungen. Sie können den Workflow auch unabhängig von diesem Zeitplan manuell auslösen.
      • START_DATE definiert den Zeitpunkt, an dem die Sicherungen planmäßig starten. Sie können diesen Wert einfach übernehmen.
      • Die restlichen Werte werden aus den Airflow-Variablen übernommen, die Sie im vorherigen Abschnitt konfiguriert haben.
    4. Erstellen Sie den DAG mit folgendem Code mit einigen der zuvor definierten Parameter. Durch diesen Code erhält der DAG auch einen Namen und eine Beschreibung. Beides wird auf der Cloud Composer-UI angezeigt.

      dag1 = DAG('backup_vm_instance',
                 description='Backup a Compute Engine instance using an Airflow DAG',
                 schedule_interval=INTERVAL,
                 start_date=START_DATE,
                 catchup=False)
    5. Füllen Sie den DAG mit Aufgaben, bei denen es sich um Operatorinstanzen handelt:

      ## Dummy tasks
      begin = DummyOperator(task_id='begin', retries=1, dag=dag1)
      end = DummyOperator(task_id='end', retries=1)
      
      ## Compute Engine tasks
      stop_instance = StopInstanceOperator(
          project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
      snapshot_disk = SnapshotDiskOperator(
          project=PROJECT, zone=ZONE, instance=INSTANCE,
          disk=DISK, task_id='snapshot_disk')
      start_instance = StartInstanceOperator(
          project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='start_instance')

      Mit diesem Code werden alle für den Workflow erforderlichen Aufgaben instanziiert. Die definierten Parameter werden dabei an die entsprechenden Operatorkonstruktoren übergeben.

      • Die task_id-Werte sind die eindeutigen IDs, die in der Cloud Composer-Benutzeroberfläche angezeigt werden. Sie verwenden diese IDs später für die Datenübergabe zwischen Aufgaben.
      • retries legt fest, wie oft eine Aufgabe wiederholt werden soll, bevor sie fehlschlägt. Bei DummyOperator-Aufgaben werden diese Werte ignoriert.
      • dag=dag gibt an, dass eine Aufgabe an den zuvor erstellten DAG angehängt ist. Dieser Parameter ist nur in der ersten Aufgabe des Workflows erforderlich.
    6. Definieren Sie die Reihenfolge der Aufgaben des Workflow-DAG:

      # Airflow DAG definition
      begin >> stop_instance >> snapshot_disk >> start_instance >> end
      
    7. Schließen Sie die Datei gce_commands_plugin.py.

    Workflow ausführen

    Der durch den Operator-DAG dargestellte Workflow kann jetzt von Cloud Composer ausgeführt werden. Cloud Composer liest die Definition des DAG und des Plug-ins aus einem zugeordneten Cloud Storage-Bucket. Dieser Bucket und die zugehörigen Verzeichnisse dags und plugins wurden beim Erstellen der Cloud Composer-Umgebung automatisch erstellt.

    Sie können den DAG und das Plug-in mit Cloud Shell in den zugehörigen Cloud Storage-Bucket kopieren:

    1. Rufen Sie in Cloud Shell den Bucket-Namen ab:

      BUCKET=$(gsutil ls)
      echo $BUCKET
      

      Es sollte einen einzelnen Bucket mit einem Namen in diesem Format geben: gs://[REGION]-{ENVIRONMENT_NAME]-{ID}-bucket/..

    2. Führen Sie das folgende Skript aus, um die DAG- und Plug-in-Dateien in die entsprechenden Bucket-Verzeichnisse zu kopieren:

      gsutil cp $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
      gsutil cp $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py "$BUCKET"dags
      

      Der Bucket-Name enthält bereits einen abschließenden Schrägstrich, daher wird die Variable $BUCKET in doppelte Anführungszeichen gesetzt.

    3. Rufen Sie in der Cloud Console die Seite "Cloud Composer" auf.

      Zur Seite "Cloud Composer"

    4. Klicken Sie in der Spalte Airflow web server auf den Link Airflow. Daraufhin wird ein neuer Tab mit der Startseite des Airflow-Webservers geöffnet. Warten Sie zwei bis drei Minuten und laden Sie die Seite anschließend neu. Möglicherweise müssen Sie die Seite mehrmals neu laden, bevor sie bereit ist.

      Eine Liste mit dem neu erstellten DAG ähnlich der folgenden wird angezeigt:

      DAG-Liste

      Wenn der Code Syntaxfehler enthält, wird über der DAG-Tabelle eine Meldung angezeigt. Laufzeitfehler werden unter DAG Runs (DAG-Ausführungen) angegeben. Korrigieren Sie alle Fehler, bevor Sie fortfahren. Kopieren Sie die Dateien zu diesem Zweck am einfachsten aus dem GitHub-Repository in den Bucket.

    5. Führen Sie den folgenden Befehl in Cloud Shell aus, um einen detaillierteren Stacktrace anzuzeigen:

      gcloud composer environments run $ENVIRONMENT --location $LOCATION list_dags
      
    6. Der Workflow wird sofort in Airflow ausgeführt. Dies ist in der Spalte DAG Runs (DAG-Ausführungen) zu sehen.

      Sie können den bereits laufenden Workflow manuell mit den folgenden Schritten neu starten:

      1. Klicken Sie in der Spalte Links auf das erste Symbol, Trigger Dag (DAG auslösen). Dieses ist im vorherigen Screenshot mit einem Pfeil markiert.
      2. Klicken Sie im Pop-up-Fenster Sind Sie sicher? auf OK.

        Der Workflow startet nach wenigen Sekunden. Unter DAG Runs (DAG-Ausführungen) wird der neue Lauf durch einen hellgrünen Kreis angezeigt.

    7. Klicken Sie in der Spalte Links auf das Symbol "Graph View". Dieses ist im vorherigen Screenshot mit einem Pfeil markiert.

      Screenshot von Cloud Composer

      Die Diagrammansicht zeigt den Workflow. Die erfolgreich ausgeführten Aufgaben sind dunkelgrün umrandet und die gerade ausgeführte Aufgabe hellgrün. Anstehende Aufgaben haben keinen Rand. Sie können auf die Aufgabe klicken, um Logdetails anzuzeigen und weitere Vorgänge auszuführen.

    8. Zum Verfolgen der Ausführung klicken Sie regelmäßig rechts oben auf die Schaltfläche "Refresh" (Aktualisieren).

      Das wars! Sie haben Ihren ersten Cloud Composer-Workflow abgeschlossen. Nach Abschluss des Workflows wird ein Snapshot des nichtflüchtigen Speichers der Compute Engine-Instanz erstellt.

    9. Prüfen Sie in Cloud Shell, ob der Snapshot erstellt wurde:

      gcloud compute snapshots list
      

      Alternativ können Sie über das Menü der Cloud Console die Seite Compute Engine-Snapshots aufrufen.

      Seite mit Compute Engine-Snapshots

    Es sollte jetzt ein Snapshot angezeigt werden. Durch nachfolgende manuell oder automatisch ausgelöste Workflows werden weitere Snapshots erstellt.

    Snapshots sind inkrementell. Der erste Snapshot ist der größte, da er alle Blöcke des nichtflüchtigen Speichers in komprimierter Form enthält. Nachfolgende Snapshots enthalten nur die Blöcke, die gegenüber dem vorherigen Snapshot geändert wurden, sowie Verweise auf die unveränderten Blöcke. Sie sind daher kleiner als der erste Snapshot, lassen sich schneller erstellen und kosten weniger.

    Wenn ein Snapshot gelöscht wird, werden die darin enthaltenen Daten in den nächsten Snapshot verschoben. Dies sichert die Konsistenz nachfolgender Deltas, die in der Snapshot-Kette gespeichert werden. Nur bei Entfernen aller Snapshots werden alle gesicherten Daten des nichtflüchtigen Speichers gelöscht.

    Benutzerdefinierten Airflow-Sensor erstellen

    Sie haben bei der Ausführung des Workflows möglicherweise festgestellt, dass die einzelnen Schritte etwas zeitaufwendig sind. Dies liegt daran, dass die Operatoren am Ende eine sleep()-Anweisung einfügen, damit die Compute Engine API die Verarbeitung beenden kann, bevor sie mit der nächsten Aufgabe beginnt.

    Dieser Ansatz ist jedoch nicht optimal und kann zu unerwarteten Problemen führen. Wenn beispielsweise die Wartezeit während der Erstellung inkrementeller Snapshots zu lang ist, entstehen unnötige Wartezeiten für bereits abgeschlossene Aufgaben. Die Wartezeit kann aber auch zu kurz sein. In diesem Fall schlägt möglicherweise der gesamte Workflow fehl oder Sie erhalten unzuverlässige Ergebnisse, da die Instanz nicht vollständig angehalten wird oder der Snapshot beim Start der Maschine nicht abgeschlossen ist.

    Sie müssen der nächsten Aufgabe mitteilen können, dass die vorherige Aufgabe erledigt ist. Hierfür bieten sich beispielsweise Airflow-Sensoren an, die den Workflow anhalten, bis bestimmte Kriterien erfüllt sind. Das Kriterium in diesem Fall ist der erfolgreiche Abschluss des vorherigen Compute Engine-Vorgangs.

    Querkommunikationsdaten für verschiedene Aufgaben freigeben

    Wenn Aufgaben miteinander kommunizieren müssen, bietet Airflow einen Mechanismus, der als XCom oder "Querkommunikation" bezeichnet wird. Mit XCom können Aufgaben Nachrichten austauschen, die aus einem Schlüssel, einem Wert und einem Zeitstempel bestehen.

    Am einfachsten lassen sich Nachrichten mit XCom übergeben, wenn ein Operator einen Wert aus seiner Methode execute() zurückgibt. Der Wert kann ein beliebiges Objekt sein, das Python mit dem Pickle-Modul serialisieren kann.

    Die drei in den vorstehenden Abschnitten beschriebenen Operatoren rufen die Compute Engine API auf. Alle diese API-Aufrufe geben ein Vorgangsressourcenobjekt zurück. Diese Objekte dienen zur Verwaltung asynchroner Anfragen wie etwa mit den Airflow-Operatoren. Jedes Objekt hat ein Feld name, mit dem Sie den aktuellen Status des Compute Engine-Vorgangs abfragen können.

    Ändern Sie die Operatoren so, dass der name des Vorgangsressourcenobjekts zurückgegeben wird:

    1. Verwenden Sie in Cloud Shell einen Texteditor wie Nano oder Vim, um die Datei gce_commands_plugin.py zu öffnen, dieses Mal aus dem Verzeichnis sensor/plugins:

      vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
      
    2. Beachten Sie in der Ausführungsmethode von StopInstanceOperator, wie der Code

      self.compute.instances().stop(
          project=self.project, zone=self.zone, instance=self.instance).execute()
      time.sleep(90)

      durch diesen Code ersetzt wurde:

      operation = self.compute.instances().stop(
          project=self.project, zone=self.zone, instance=self.instance).execute()
      return operation['name']

      Dabei gilt:

      • Die erste Zeile erfasst den Rückgabewert des API-Aufrufs in der Variable operation.
      • Die zweite Zeile gibt das Vorgangsfeld name der Methode execute() zurück. Mit dieser Anweisung wird der Name mithilfe des Pickle-Moduls serialisiert und in den innerhalb einer Aufgabe gemeinsam genutzten XCom-Bereich übertragen. Der Wert wird später abgerufen, wobei der letzte Wert zuerst verwendet wird.

      Wenn mit einer Aufgabe mehrere Werte übertragen werden sollen, können Sie XCom eine explizite key zuweisen, indem Sie xcom_push() direkt aufrufen, anstatt den Wert zurückzugeben.

    3. In ähnlicher Weise sehen Sie in der Ausführungsmethode von SnapshotDiskOperator, wie der Code

      self.compute.disks().createSnapshot(
          project=self.project, zone=self.zone, disk=self.disk,
          body={'name': snapshot_name}).execute()
      time.sleep(120)

      durch diesen Code ersetzt wurde:

      operation = self.compute.disks().createSnapshot(
          project=self.project, zone=self.zone, disk=self.disk,
          body={'name': snapshot_name}).execute()
      return operation['name']

      Der Code enthält zwei nicht verwandte Namen. Der erste ist der Name des Snapshots und der zweite der Name des Vorgangs.

    4. Sehen Sie sich schließlich in der Ausführungsmethode von StartInstanceOperator an, wie der Code

      self.compute.instances().start(
          project=self.project, zone=self.zone, instance=self.instance).execute()
      time.sleep(20)

      durch diesen Code ersetzt wurde:

      operation = self.compute.instances().start(
          project=self.project, zone=self.zone, instance=self.instance).execute()
      return operation['name']
    5. Jetzt sollten in der Datei gce_commands_plugin.py keine Aufrufe der Methode sleep() vorhanden sein. Suchen Sie in der Datei nach sleep, um das zu prüfen. Prüfen Sie andernfalls die vorherigen Schritte in diesem Abschnitt.

      Da aus dem Code keine Aufrufe an sleep() erfolgen, wurde die folgende Zeile aus dem Importbereich oben in der Datei entfernt:

      import time
      
    6. Schließen Sie die Datei gce_commands_plugin.py.

    Sensor implementieren und bereitstellen

    Im vorherigen Abschnitt haben Sie alle Operatoren geändert, um einen Namen für einen Compute Engine-Vorgang zurückzugeben. In diesem Abschnitt erstellen Sie mithilfe des Vorgangsnamens einen Airflow-Sensor, um die Compute Engine API bezüglich des Abschlusses jedes Vorgangs abzufragen.

    1. Verwenden Sie in Cloud Shell einen Texteditor wie Nano oder Vim, um die Datei gce_commands_plugin.py aus dem Verzeichnis sensor/plugins zu öffnen:

      vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
      

      Beachten Sie die folgende Codezeile oben im Importbereich direkt unter der Zeile from airflow.models import BaseOperator:

      from airflow.operators.sensors import BaseSensorOperator
      

      Alle Sensoren werden von der Klasse BaseSensorOperator abgeleitet und müssen deren Methode poke() überschreiben.

    2. Sehen Sie sich die neue Klasse OperationStatusSensor an:

      class OperationStatusSensor(BaseSensorOperator):
        """Waits for a Compute Engine operation to complete."""
      
        @apply_defaults
        def __init__(self, project, zone, instance, prior_task_id, *args, **kwargs):
          self.compute = self.get_compute_api_client()
          self.project = project
          self.zone = zone
          self.instance = instance
          self.prior_task_id = prior_task_id
          super(OperationStatusSensor, self).__init__(*args, **kwargs)
      
        def get_compute_api_client(self):
          credentials = GoogleCredentials.get_application_default()
          return googleapiclient.discovery.build(
              'compute', 'v1', cache_discovery=False, credentials=credentials)
      
        def poke(self, context):
          operation_name = context['task_instance'].xcom_pull(
              task_ids=self.prior_task_id)
          result = self.compute.zoneOperations().get(
              project=self.project, zone=self.zone,
              operation=operation_name).execute()
      
          logging.info(
              "Task '%s' current status: '%s'", self.prior_task_id, result['status'])
          if result['status'] == 'DONE':
            return True
          else:
            logging.info("Waiting for task '%s' to complete", self.prior_task_id)
            return False

      Die Klasse OperationStatusSensor hat folgende Methoden:

      • __init__: Klassenkonstruktor. Dieser Konstruktor verwendet ähnliche Parameter wie die Operatoren, mit einer Ausnahme: prior_task_id. Dieser Parameter ist die ID der vorherigen Aufgabe.
      • poke: Hauptsensormethode, die von BaseSensorOperator überschrieben wird. Airflow ruft diese Methode alle 60 Sekunden auf, bis die Methode True zurückgibt. Nachfolgende Aufgaben dürfen erst dann ausgeführt werden.

        Sie können das Intervall für diese Wiederholungsversuche konfigurieren. Übergeben Sie dazu den Parameter poke_interval an den Konstruktor. Sie können auch ein timeout festlegen. Weitere Informationen finden Sie in der Referenz zur BaseSensorOperator API

        In der Implementierung der vorstehenden poke-Methode ist die erste Zeile ein Aufruf von xcom_pull(). Diese Methode ruft den neuesten XCom-Wert für die durch prior_task_id identifizierte Aufgabe ab. Der Wert ist der Name eines Compute Engine-Vorgangs und wird in der Variable operation_name gespeichert.

        Der Code führt dann die Methode zoneOperations.get() aus und übergibt operation_name als Parameter, um den neuesten Status für den Vorgang abzurufen. Wenn der Status DONE lautet, gibt die Methode poke() True zurück, andernfalls False. Im ersten Fall werden nachgelagerte Aufgaben gestartet. Im letzteren Fall bleibt die Workflowausführung pausiert und die Methode poke() wird nach poke_interval Sekunden noch einmal aufgerufen.

    3. Unten in der Datei sehen Sie, wie die Klasse GoogleComputeEnginePlugin aktualisiert wurde, um OperationStatusSensor der Liste der vom Plug-in exportierten Operatoren hinzuzufügen:

      class GoogleComputeEnginePlugin(AirflowPlugin):
        """Expose Airflow operators and sensor."""
      
        name = 'gce_commands_plugin'
        operators = [StopInstanceOperator, SnapshotDiskOperator,
                     StartInstanceOperator, OperationStatusSensor]
    4. Schließen Sie die Datei gce_commands_plugin.py.

    Workflow aktualisieren

    Nachdem Sie den Sensor im Plug-in erstellt haben, können Sie ihn in den Workflow einfügen. In diesem Abschnitt aktualisieren Sie den Workflow auf den endgültigen Zustand. Dieser umfasst alle drei Operatoren sowie die dazwischen ausgeführten Sensoraufgaben. Anschließend führen Sie den aktualisierten Workflow aus und prüfen ihn.

    1. Verwenden Sie in Cloud Shell einen Texteditor wie Nano oder Vim, um die Datei backup_vm_instance.py zu öffnen, dieses Mal aus dem Verzeichnis sensor/dags:

      vi $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py
      
      
    2. Beachten Sie im Importbereich, dass der neu erstellte Sensor unterhalb der Zeile from airflow operators import StartInstanceOperator importiert wird:

      from airflow.operators import OperationStatusSensor
      
    3. Untersuchen Sie die Zeilen im Anschluss an den Kommentar ## Wait tasks

      ## Wait tasks
      wait_for_stop = OperationStatusSensor(
          project=PROJECT, zone=ZONE, instance=INSTANCE,
          prior_task_id='stop_instance', poke_interval=15, task_id='wait_for_stop')
      wait_for_snapshot = OperationStatusSensor(
          project=PROJECT, zone=ZONE, instance=INSTANCE,
          prior_task_id='snapshot_disk', poke_interval=10,
          task_id='wait_for_snapshot')
      wait_for_start = OperationStatusSensor(
          project=PROJECT, zone=ZONE, instance=INSTANCE,
          prior_task_id='start_instance', poke_interval=5, task_id='wait_for_start')

      Der Code verwendet OperationStatusSensor wieder, um drei Warteausgaben ("wait tasks") zu definieren. Jede dieser Aufgaben wartet, bis der vorherige Vorgang abgeschlossen ist. Die folgenden Parameter werden an den Sensorkonstruktor übergeben:

      • Die Parameter PROJECT, ZONE und INSTANCE der WordPress-Instanz. Diese sind bereits in der Datei definiert.
      • prior_task_id: ID der Aufgabe, auf die der Sensor wartet. Die Aufgabe wait_for_stop wartet beispielsweise darauf, dass die Aufgabe mit der ID stop_instance abgeschlossen wird.

      • poke_interval: Anzahl der Sekunden, die Airflow zwischen wiederholten Aufrufen der Methode poke() des Sensors warten soll. Mit anderen Worten, die Häufigkeit, mit der geprüft wird, ob prior_task_id bereits ausgeführt wurde.

      • task_id: ID der neu erstellten Warteaufgabe.

    4. Am Ende der Datei sehen Sie, dass der Code:

      begin >> stop_instance >> snapshot_disk >> start_instance >> end
      

      durch diesen Code ersetzt wurde:

      begin >> stop_instance >> wait_for_stop >> snapshot_disk >> wait_for_snapshot \
              >> start_instance >> wait_for_start >> end
      

      Diese Zeilen definieren den vollständigen Sicherungsworkflow.

    5. Schließen Sie die Datei backup_vm_instance.py.

    Kopieren Sie jetzt den DAG und das Plug-in aus dem zugehörigen Cloud Storage-Bucket:

    1. Rufen Sie in Cloud Shell den Bucket-Namen ab:

      BUCKET=$(gsutil ls)
      echo $BUCKET
      

      Sie sollten einen einzelnen Bucket mit einem Namen in diesem Format sehen: gs://[REGION]-[ENVIRONMENT_NAME]-[ID]-bucket/ .

    2. Führen Sie das folgende Skript aus, um die DAG- und Plug-in-Dateien in die entsprechenden Bucket-Verzeichnisse zu kopieren:

      gsutil cp $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
      gsutil cp $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py "$BUCKET"dags
      

      Der Bucket-Name enthält bereits einen abschließenden Schrägstrich, daher wird die Variable $BUCKET in doppelte Anführungszeichen gesetzt.

    3. Laden Sie den aktualisierten Workflow in Airflow hoch:

      1. Rufen Sie in der Cloud Console die Seite "Cloud Composer" auf.

        Zur Seite "Cloud Composer"

      2. Klicken Sie in der Spalte Airflow auf den Link Airflow web server, um die Airflow-Startseite anzuzeigen.

      3. Warten Sie zwei oder drei Minuten, bis das Plug-in und der Workflow automatisch von Airflow aktualisiert wurden. Die DAG-Tabelle ist möglicherweise vorübergehend leer. Laden Sie die Seite mehrmals neu, bis der Abschnitt Links konsistent angezeigt wird.

      4. Achten Sie darauf, dass keine Fehler angezeigt werden. Klicken Sie im Abschnitt Links auf Tree View.

        Screenshot der Cloud Composer-Baumansicht Der Workflow wird links als Bottom-up-Baum angezeigt. Rechts sehen Sie ein Diagramm mit den verschiedenen Ausführungsterminen der Aufgabe. Ein grünes Quadrat gibt an, dass die Aufgabe am angegebenen Datum erfolgreich ausgeführt wurde. Ein weißes Quadrat gibt an, dass eine Aufgabe noch nie ausgeführt wurde. Da Sie den DAG mit neuen Sensoraufgaben aktualisiert haben, werden alle diese Aufgaben weiß dargestellt, während Compute Engine-Aufgaben grün gekennzeichnet sind.

      5. Führen Sie den aktualisierten Sicherungsworkflow aus:

        1. Klicken Sie im oberen Menü auf DAGs, um zur Startseite zurückzukehren.
        2. Klicken Sie in der Spalte Links auf Trigger DAG.
        3. Klicken Sie im daraufhin angezeigten Pop-up-Fenster Sind Sie sicher? auf OK. Daraufhin startet ein neuer Workflow, der in der Spalte DAG-Ausführungen als hellgrüner Kreis angezeigt wird.
      6. Klicken Sie unter Links auf das Symbol Graph View, um die Workflowausführung in Echtzeit zu beobachten.

      7. Klicken Sie rechts auf die Schaltfläche "Refresh" (Aktualisieren), um die Aufgabenausführung zu verfolgen. Der Workflow wird bei jeder Sensoraufgabe angehalten, um auf den Abschluss der vorherigen Aufgabe zu warten. Die Wartezeit wird an die Anforderungen jeder Aufgabe angepasst und nicht anhand eines hartcodierten Ruhewerts definiert.

      Screenshot der Aufgabenausführung in Cloud Composer

    4. Optional können Sie während des Workflows zur Cloud Console zurückkehren, das Menü Compute Engine auswählen und auf VM-Instanzen klicken, um zu sehen, wie die virtuelle Maschine angehalten und neu gestartet wird. Sie können auch auf Snapshots klicken, um den neu erstellten Snapshot anzeigen zu lassen.

    Sie haben jetzt einen Sicherungsworkflow ausgeführt, der einen Snapshot einer Compute Engine-Instanz erstellt. Dieser Snapshot folgt Best Practices und optimiert den Workflow mit Sensoren.

    Instanz von einem Snapshot wiederherstellen

    Das Erstellen eines Snapshots ist nur eine Komponente des Sicherungsprozesses. Hinzu kommt das Wiederherstellen einer Instanz von dem Snapshot.

    So erstellen Sie eine Instanz von einem Snapshot:

    1. Rufen Sie in Cloud Shell eine Liste der verfügbaren Snapshots auf:

      gcloud compute snapshots list
      

      Die Ausgabe sieht etwa so aus:

      NAME                              DISK_SIZE_GB  SRC_DISK                            STATUS
      wordpress-1-vm-2018-07-18-120044  10            us-central1-c/disks/wordpress-1-vm  READY
      wordpress-1-vm-2018-07-18-120749  10            us-central1-c/disks/wordpress-1-vm  READY
      wordpress-1-vm-2018-07-18-125138  10            us-central1-c/disks/wordpress-1-vm  READY
      
    2. Wählen Sie einen Snapshot aus und erstellen Sie daraus ein eigenständiges, nichtflüchtiges Bootlaufwerk. Ersetzen Sie die Platzhalter in Klammern durch eigene Werte.

      gcloud compute disks create [DISK_NAME] --source-snapshot [SNAPSHOT_NAME] \
          --zone=[ZONE]
      

      Dabei gilt:

      • DISK_NAME ist der Name des neuen eigenständigen nichtflüchtigen Bootlaufwerks.
      • SNAPSHOT_NAME ist der Snapshot, den Sie in der ersten Spalte der vorherigen Ausgabe ausgewählt haben.
      • ZONE ist die Compute-Zone, in der das neue Laufwerk erstellt wird.
    3. Erstellen Sie auf dem Bootlaufwerk eine neue Instanz. Ersetzen Sie [INSTANCE_NAME] durch den Namen der Instanz, die Sie erstellen möchten.

      gcloud compute instances create [INSTANCE_NAME] --disk name=[DISK_NAME],boot=yes \
          --zone=ZONE --tags=wordpress-1-tcp-443,wordpress-1-tcp-80
      

      Durch die beiden im Befehl angegebenen Tags kann die Instanz automatisch eingehenden Traffic auf den Ports 443 und 80 empfangen. Dies liegt an den Firewallregeln, die für die anfängliche WordPress-Instanz erstellt wurden.

      Beachten Sie die externe IP-Adresse der neuen Instanz, die vom vorherigen Befehl zurückgegeben wurde.

    4. Prüfen Sie, ob WordPress auf der neu erstellten Instanz ausgeführt wird. Rufen Sie auf einem neuen Browsertab die externe IP-Adresse auf. Die Standard-Landingpage von WordPress wird angezeigt.

    5. Alternativ können Sie in der Konsole eine Instanz von einem Snapshot erstellen:

      1. Öffnen Sie in der Cloud Console die Seite "Snapshots":

        Zur Seite "Snapshots"

      2. Klicken Sie auf den neuesten Snapshot.

      3. Klicken Sie auf Instanz erstellen.

      4. Klicken Sie im Formular Neue VM-Instanz auf Verwaltung, Sicherheit, Laufwerke, Netzwerke, einzelne Mandanten und dann auf Netzwerk.

      5. Fügen Sie im Feld Netzwerk-Tags wordpress-1-tcp-443 und wordpress-1-tcp-80 hinzu und drücken Sie nach jedem Tag die Eingabetaste. Eine Erläuterung der Tags finden Sie oben.

      6. Klicken Sie auf Erstellen.

        Basierend auf dem neuesten Snapshot wird eine neue Instanz erstellt, die sofort Inhalt bereitstellen kann.

    6. Rufen Sie die Seite mit den Compute Engine-Instanzen auf und notieren Sie sich die externe IP-Adresse der neuen Instanz.

    7. Prüfen Sie, ob WordPress auf der neu erstellten Instanz ausgeführt wird. Rufen Sie die externe IP-Adresse auf einem neuen Browsertab auf.

    Weitere Informationen finden Sie unter Aus einem Snapshot eine Instanz erstellen.

    Bereinigen

    1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

      Zur Seite "Ressourcen verwalten"

    2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen .
    3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.

    Weitere Informationen