Pipelines orchestrieren

Auf dieser Seite werden die Pipelineorchestrierung mit Cloud Composer und Trigger. Für Cloud Data Fusion wird die Verwendung von Cloud Composer zum Orchestrating von Pipelines empfohlen. Wenn Sie die Orchestrierung einfacher verwalten möchten, verwenden Sie Trigger.

Composer

Pipelines mit Cloud Composer orchestrieren

Pipelineausführung in Cloud Data Fusion orchestrieren mit Cloud Composer bietet folgende Vorteile:

  • Zentrale Workflowverwaltung: Sie können die Ausführung mehrerer Cloud Data Fusion-Pipelines einheitlich verwalten.
  • Abhängigkeitsverwaltung: Um die richtige Ausführungsreihenfolge zu gewährleisten, definieren Sie Abhängigkeiten zwischen Pipelines.
  • Monitoring und Benachrichtigungen: Cloud Composer bietet Monitoringfunktionen und Benachrichtigungen bei Fehlern.
  • Integration in andere Dienste:Mit Cloud Composer können Sie Workflows zu orchestrieren, die sich über Cloud Data Fusion und andere Google Cloud-Dienste.

So orchestrieren Sie Cloud Data Fusion-Pipelines mit Cloud Composer:

  1. Richten Sie die Cloud Composer-Umgebung ein.

    • Erstellen Sie eine Cloud Composer-Umgebung. Wenn Sie noch keine haben, stellen Sie die Umgebung in Ihrem Google Cloud-Projekt bereit. Diese Umgebung ist Ihr Orchestrierungsarbeitsbereich.
    • Berechtigungen erteilen. Achten Sie darauf, dass Cloud Composer Dienstkonto hat die erforderlichen Berechtigungen für den Zugriff Cloud Data Fusion (z. B. Berechtigung zum Starten, Stoppen und Auflisten Pipelines).
  2. Definieren Sie gerichtete azyklische Graphen (Directed Acyclic Graphs, DAG) für die Orchestrierung.

    • DAG erstellen: Erstellen Sie in Cloud Composer einen DAG, der den Orchestration-Workflow für Ihre Cloud Data Fusion-Pipelines definiert.
    • Cloud Data Fusion-Operatoren:Verwenden Sie die Operatoren von Cloud Composer Cloud Data Fusion-Operatoren in Ihrem DAG. Mit diesen Operatoren können Sie programmatisch mit Cloud Data Fusion interagieren.

Cloud Data Fusion-Operatoren

Die Pipelineorchestrierung von Cloud Data Fusion umfasst die folgenden Operatoren:

CloudDataFusionStartPipelineOperator

Löst die Ausführung einer Cloud Data Fusion-Pipeline anhand ihrer ID aus. Sie hat folgende Parameter:

  • Pipeline-ID
  • Standort (Google Cloud-Region)
  • Pipeline-Namespace
  • Laufzeitargumente (optional)
  • Auf Abschluss warten (optional)
  • Zeitüberschreitung (optional)
CloudDataFusionStopPipelineOperator

Hiermit können Sie eine ausgeführte Cloud Data Fusion-Pipeline anhalten.

CloudDataFusionDeletePipelineOperator

Löscht eine Cloud Data Fusion-Pipeline.

DAG-Workflow erstellen

Beachten Sie beim Erstellen des DAG-Workflows Folgendes:

  • Abhängigkeiten definieren: Mit der DAG-Struktur können Sie Abhängigkeiten zwischen Aufgaben definieren. Beispiel: Sie haben eine Aufgabe, die darauf wartet, dass eine Pipeline in einem Namespace erfolgreich abgeschlossen wird, bevor eine andere Pipeline in einem anderen Namespace ausgelöst wird.
  • Planung: Sie können den DAG so planen, dass er in bestimmten Intervallen ausgeführt wird, z. B. täglich oder stündlich, oder ihn manuell auslösen.

Weitere Informationen finden Sie in der Cloud Composer – Übersicht

Trigger

Pipelines mit Triggern orchestrieren

Mit Cloud Data Fusion-Triggern können Sie eine nachgelagerte Pipeline automatisch ausführen, wenn eine oder mehrere vorgelagerte Pipelines abgeschlossen sind (Erfolg, Fehler oder eine beliebige angegebene Bedingung).

Trigger sind für die folgenden Aufgaben nützlich:

  • Daten einmalig bereinigt und dann für mehrere Personen nachgelagerte Pipelines für die Nutzung.
  • Freigabeinformationen wie Laufzeitargumente und Plug-in Konfigurationen zwischen Pipelines. Diese Aufgabe wird als Nutzlastkonfiguration bezeichnet.
  • Mit einer Reihe dynamischer Pipelines, die mit den Daten der Stunde ausgeführt werden, Tag, Woche oder Monat statt einer statischen Pipeline, die aktualisiert werden muss, bei jedem Lauf.

Angenommen, Sie haben einen Datensatz mit allen Informationen zu den Lieferungen Ihres Unternehmens. Anhand dieser Daten möchten Sie mehrere geschäftliche Fragen beantworten. Dazu erstellen Sie eine Pipeline, die die Rohdaten zu Sendungen bereinigt. Sie heißt Shipments Data Cleaning (Datenbereinigung für Sendungen). Anschließend erstellen Sie eine zweite Pipeline, Delayed Shipments USA, die die bereinigten Daten liest und die Lieferungen innerhalb der USA findet, die um mehr als einen bestimmten Schwellenwert verzögert wurden. Die Pipeline Verspätete Sendungen – USA kann ausgelöst werden, sobald die vorgelagerte Pipeline Bereinigung der Versanddaten erfolgreich abgeschlossen wurde.

Da die nachgelagerte Pipeline außerdem die Ausgabe des vorgelagerte Pipeline müssen Sie angeben, dass beim Ausführen der nachgelagerten Pipeline mit diesem Trigger empfängt er auch das Eingabeverzeichnis, aus dem gelesen werden soll (aus dem das Verzeichnis ist, in dem die vorgelagerte Pipeline ihre Ausgabe generiert hat). Dieses wird als Weitergabe-Nutzlastkonfiguration bezeichnet, die Sie mit Laufzeitargumente. Sie können eine Reihe dynamischer Pipelines haben, die mit den Daten der Stunde, des Tages, der Woche oder des Monats ausgeführt werden, anstelle einer statischen Pipeline, die bei jeder Ausführung aktualisiert werden muss.

So orchestrieren Sie Pipelines mit Triggern:

  1. Vor- und nachgelagerte Pipelines erstellen.

    • Erstellen Sie in Cloud Data Fusion Studio die Pipelines, die Ihre Orchestrierungskette bilden.
    • Überlegen Sie, durch den Abschluss welcher Pipeline die nächste Pipeline (Downstream) in Ihrem Workflow aktiviert wird.
  2. Optional: Laufzeitargumente für vorgelagerte Pipelines übergeben

    • Wenn Sie die Nutzlastkonfiguration als Laufzeitargumente zwischen Pipelines übergeben möchten, konfigurieren Sie die Laufzeitargumente. Diese Argumente können während der Ausführung an die nachgelagerte Pipeline übergeben werden.
  3. Erstellen Sie einen eingehenden Trigger in der nachgelagerten Pipeline.

    • Rufen Sie in Cloud Data Fusion Studio die Seite Liste auf. In Klicken Sie auf dem Tab Bereitgestellt auf den Namen der nachgelagerten Pipeline. Die Die Bereitstellungsansicht für diese Pipeline wird angezeigt.
    • Klicken Sie in der Mitte der linken Seite der Seite auf Eingehende Trigger. Eine Liste der verfügbaren Pipelines wird angezeigt.
    • Klicken Sie auf die vorgelagerte Pipeline. Wählen Sie mindestens eine der Upstream- Pipeline-Abschlussstatus (Erfolgreich, Fehler oder Stopps) als Bedingung für die Ausführung der nachgelagerten Pipeline.
    • Wenn Sie möchten, dass die vorgelagerte Pipeline Informationen (sogenannte Nutzlastkonfiguration) mit der nachgelagerten Pipeline, klicken Sie auf Triggerkonfiguration aus und führen Sie dann die folgenden Schritte aus: Nutzlastkonfiguration als Laufzeitargumente übergeben. Klicken Sie andernfalls auf Trigger aktivieren.
  4. Testen Sie den Trigger.

    • Initiieren Sie eine Ausführung der vorgelagerten Pipeline.
    • Wenn der Trigger richtig konfiguriert ist, kann die nachgelagerte Pipeline wird nach Abschluss der vorgelagerten Pipelines automatisch ausgeführt. basierend auf der konfigurierten Bedingung.

Nutzlastkonfiguration als Laufzeitargumente übergeben

Die Nutzlastkonfiguration ermöglicht die Freigabe von Informationen von der vorgelagerten Pipeline an die nachgelagerte Pipeline. Diese Informationen können beispielsweise das Ausgabeverzeichnis, das Datenformat oder den Tag, an dem die Pipeline ausgeführt wurde. Diese Informationen werden dann von der nachgelagerten Pipeline für Entscheidungen verwendet, z. B. zum Bestimmen des richtigen Datasets, aus dem gelesen werden soll.

Um Informationen von der vorgelagerten Pipeline an die nachgelagerte Pipeline zu übergeben, legen Sie für die Laufzeitargumente der nachgelagerten Pipeline die Werte entweder die Laufzeitargumente oder die Konfiguration eines Plug-ins im vorgelagerte Pipeline nutzen.

Bei jedem Auslösen und Ausführen der nachgelagerten Pipeline wird ihre Nutzlast wird die Konfiguration mithilfe der Laufzeitargumente des jeweiligen Durchlaufs der vorgelagerten Pipeline, die die nachgelagerte Pipeline ausgelöst hat.

So übergeben Sie die Nutzlastkonfiguration als Laufzeitargumente:

  1. Wenn Sie an der Stelle im Abschnitt Eingehenden Trigger erstellen fortfahren, wo Sie aufgehört haben, nachdem Sie auf Triggerkonfiguration geklickt haben, werden alle Laufzeitargumente angezeigt, die Sie für die vorgelagerte Pipeline zuvor festgelegt haben. Wählen Sie die Laufzeitargumente aus, die von der vorgelagerten Pipeline an die nachgelagerte Pipeline übergeben werden sollen, wenn dieser Trigger ausgeführt wird.
  2. Klicken Sie auf den Tab Plug-in-Konfiguration, um eine Liste der Elemente zu öffnen, die von Ihrer vorgelagerten Pipeline an Ihre nachgelagerte Pipeline übergeben werden, wenn diese ausgelöst wird.
  3. Klicken Sie auf Trigger konfigurieren und aktivieren.