Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Auf dieser Seite wird die Pipeline-Orchestrierung mit Cloud Composer und Triggern erläutert. Für Cloud Data Fusion wird die Verwendung von Cloud Composer zum Orchestrating von Pipelines empfohlen. Wenn Sie die Orchestrierung einfacher verwalten möchten, verwenden Sie Trigger.
Composer
Pipelines mit Cloud Composer orchestrieren
Die Pipelineausführung in Cloud Data Fusion mit Cloud Composer zu orchestrieren, bietet folgende Vorteile:
Zentrale Workflowverwaltung:Sie können die Ausführung mehrerer Cloud Data Fusion-Pipelines einheitlich verwalten.
Abhängigkeitsverwaltung:Um die richtige Ausführungsreihenfolge zu gewährleisten, definieren Sie Abhängigkeiten zwischen Pipelines.
Monitoring und Benachrichtigungen:Cloud Composer bietet Monitoringfunktionen und Benachrichtigungen bei Fehlern.
Einbindung in andere Dienste:Mit Cloud Composer können Sie Workflows orchestrieren, die sich über Cloud Data Fusion und andereGoogle Cloud Dienste erstrecken.
So orchestrieren Sie Cloud Data Fusion-Pipelines mit Cloud Composer:
Richten Sie die Cloud Composer-Umgebung ein.
Erstellen Sie eine Cloud Composer-Umgebung. Wenn Sie noch keine haben, stellen Sie die Umgebung in Ihrem Google Cloud Projekt bereit.
Diese Umgebung ist Ihr Orchestration-Arbeitsbereich.
Erteilen Sie Berechtigungen. Das Cloud Composer-Dienstkonto muss die erforderlichen Berechtigungen für den Zugriff auf Cloud Data Fusion haben, z. B. die Berechtigung zum Starten, Stoppen und Auflisten von Pipelines.
Definieren Sie gerichtete azyklische Graphen (Directed Acyclic Graphs, DAG) für die Orchestrierung.
DAG erstellen:Erstellen Sie in Cloud Composer einen DAG, der den Orchestration-Workflow für Ihre Cloud Data Fusion-Pipelines definiert.
Cloud Data Fusion-Operatoren:Verwenden Sie die Cloud Data Fusion-Operatoren von Cloud Composer in Ihrer DAG. Mit diesen Operatoren können Sie programmatisch mit Cloud Data Fusion interagieren.
Cloud Data Fusion-Operatoren
Die Cloud Data Fusion-Pipeline-Orchestrierung bietet die folgenden Operatoren:
CloudDataFusionStartPipelineOperator
Löst die Ausführung einer Cloud Data Fusion-Pipeline anhand ihrer ID aus. Sie hat folgende Parameter:
Pipeline-ID
Standort (Google Cloud Region)
Pipeline-Namespace
Laufzeitargumente (optional)
Auf Abschluss warten (optional)
Zeitüberschreitung (optional)
CloudDataFusionStopPipelineOperator
Hiermit können Sie eine laufende Cloud Data Fusion-Pipeline beenden.
CloudDataFusionDeletePipelineOperator
Löscht eine Cloud Data Fusion-Pipeline.
DAG-Workflow erstellen
Beachten Sie beim Erstellen des DAG-Workflows Folgendes:
Abhängigkeiten definieren:Mit der DAG-Struktur können Sie Abhängigkeiten zwischen Aufgaben definieren. Beispiel: Sie haben eine Aufgabe, die darauf wartet, dass eine Pipeline in einem Namespace erfolgreich abgeschlossen wird, bevor eine andere Pipeline in einem anderen Namespace ausgelöst wird.
Planung:Sie können den DAG so planen, dass er in bestimmten Intervallen ausgeführt wird, z. B. täglich oder stündlich, oder ihn manuell auslösen.
Mit Cloud Data Fusion-Triggern können Sie eine nachgelagerte Pipeline automatisch ausführen, wenn eine oder mehrere vorgelagerte Pipelines abgeschlossen wurden (Erfolg, Fehler oder eine beliebige angegebene Bedingung).
Trigger eignen sich für die folgenden Aufgaben:
Bereinigen Ihrer Daten und für mehrere nachgelagerte Pipelines zur Verwendung zur Verfügung stellen.
Informationen wie Laufzeitargumente und Plug-in-Konfigurationen zwischen Pipelines freigeben. Diese Aufgabe wird als Nutzlastkonfiguration bezeichnet.
Sie haben eine Reihe dynamischer Pipelines, die mit den Daten von Stunde, Tag, Woche oder Monat ausgeführt werden können, anstelle einer statischen Pipeline, die bei jeder Ausführung aktualisiert werden muss.
Angenommen, Sie haben einen Datensatz mit allen Informationen zu den Lieferungen Ihres Unternehmens. Anhand dieser Daten möchten Sie einige geschäftliche Fragen beantworten. Dazu erstellen Sie eine Pipeline, die die Rohdaten zu Sendungen bereinigt. Sie heißt Shipments Data Cleaning (Datenbereinigung für Sendungen). Anschließend erstellen Sie eine zweite Pipeline, Delayed Shipments USA, die die bereinigten Daten liest und die Lieferungen innerhalb der USA findet, die um mehr als einen bestimmten Schwellenwert verzögert wurden. Die Pipeline Verspätete Sendungen – USA kann ausgelöst werden, sobald die vorgelagerte Pipeline Bereinigung der Versanddaten erfolgreich abgeschlossen wurde.
Da die Ausgabe der vorgelagerten Pipeline von der nachgelagerten Pipeline verwendet wird, müssen Sie außerdem angeben, dass die nachgelagerte Pipeline bei der Ausführung mit diesem Trigger auch das Eingabeverzeichnis erhält, aus dem gelesen werden soll. Das ist das Verzeichnis, in dem die vorgelagerte Pipeline die Ausgabe generiert hat. Dieser Vorgang wird als Übergeben der Nutzlastkonfiguration bezeichnet und wird mithilfe von Laufzeitargumenten definiert. Sie können eine Reihe dynamischer Pipelines haben, die mit den Daten der Stunde, des Tages, der Woche oder des Monats ausgeführt werden, und keine statische Pipeline, die bei jeder Ausführung aktualisiert werden muss.
So orchestrieren Sie Pipelines mit Triggern:
Erstellen Sie vorgelagerte und nachgelagerte Pipelines.
Entwerfen und implementieren Sie in Cloud Data Fusion Studio die Pipelines, die Ihre Orchestrationskette bilden.
Überlegen Sie, durch den Abschluss welcher Pipeline die nächste Pipeline (Downstream) in Ihrem Workflow aktiviert wird.
Optional: Laufzeitargumente für vorgelagerte Pipelines übergeben
Wenn Sie die Nutzlastkonfiguration als Laufzeitargumente zwischen Pipelines übergeben möchten, konfigurieren Sie die Laufzeitargumente. Diese Argumente können während der Ausführung an die nachgelagerte Pipeline übergeben werden.
Erstellen Sie einen eingehenden Trigger in der nachgelagerten Pipeline.
Rufen Sie in Cloud Data Fusion Studio die Seite Liste auf. Klicken Sie auf dem Tab Bereitgestellt auf den Namen der nachgelagerten Pipeline. Die Bereitstellungsansicht für diese Pipeline wird angezeigt.
Klicken Sie in der Mitte der linken Seite der Seite auf Eingehende Trigger.
Eine Liste der verfügbaren Pipelines wird angezeigt.
Klicken Sie auf die vorgelagerte Pipeline. Wählen Sie einen oder mehrere Abschlussstatus der vorgelagerten Pipeline als Bedingung aus (Erfolgreich, Fehlgeschlagen oder Angehalten), für den Zeitpunkt, an dem die nachgelagerte Pipeline ausgeführt werden soll.
Wenn die vorgelagerte Pipeline Informationen (auch als Nutzlastkonfiguration bezeichnet) an die nachgelagerte Pipeline weitergeben soll, klicken Sie auf Triggerkonfiguration und folgen Sie den Schritten für das Übergeben der Nutzlastkonfiguration als Laufzeitargument.
Klicken Sie andernfalls auf Trigger aktivieren.
Testen Sie den Trigger.
Starten Sie eine Ausführung der vorgelagerten Pipeline.
Wenn der Trigger richtig konfiguriert ist, wird die nachgelagerte Pipeline nach Abschluss der vorgelagerten Pipelines automatisch ausgeführt, basierend auf der konfigurierten Bedingung.
Nutzlastkonfiguration als Laufzeitargumente übergeben
Die Nutzlastkonfiguration ermöglicht die Freigabe von Informationen von der vorgelagerten Pipeline an die nachgelagerte Pipeline. Diese Informationen können beispielsweise das Ausgabeverzeichnis, das Datenformat oder der Tag sein, an dem die Pipeline ausgeführt wurde. Diese Informationen werden dann von der nachgelagerten Pipeline für Entscheidungen verwendet, z. B. zum Bestimmen des richtigen Datasets, aus dem gelesen werden soll.
Wenn Sie Informationen von der vorgelagerten Pipeline an die nachgelagerte Pipeline weitergeben möchten, legen Sie die Laufzeitargumente der nachgelagerten Pipeline mit den Werten der Laufzeitargumente oder der Konfiguration eines beliebigen Plug-ins in der vorgelagerten Pipeline fest.
Bei jeder Auslösung und Ausführung der nachgelagerten Pipeline wird deren Nutzlastkonfiguration mit den Laufzeitargumenten der jeweiligen Ausführung der vorgelagerten Pipeline festgelegt, die die nachgelagerte Pipeline ausgelöst hat.
So übergeben Sie die Nutzlastkonfiguration als Laufzeitargumente:
Wenn Sie an der Stelle im Abschnitt Eingehenden Trigger erstellen fortfahren, wo Sie aufgehört haben, nachdem Sie auf Triggerkonfiguration geklickt haben, werden alle Laufzeitargumente angezeigt, die Sie für die vorgelagerte Pipeline zuvor festgelegt haben. Wählen Sie die Laufzeitargumente aus, die von der vorgelagerten Pipeline an die nachgelagerte Pipeline übergeben werden sollen, wenn dieser Trigger ausgeführt wird.
Klicken Sie auf den Tab Plug-in-Konfiguration, um eine Liste der Elemente zu öffnen, die von Ihrer vorgelagerten Pipeline an Ihre nachgelagerte Pipeline übergeben werden, wenn diese ausgelöst wird.
Klicken Sie auf Trigger konfigurieren und aktivieren.
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2025-09-04 (UTC)."],[[["\u003cp\u003eCloud Composer can orchestrate multiple Cloud Data Fusion pipelines, offering centralized workflow and dependency management, monitoring, alerting, and integration with other Google Cloud services.\u003c/p\u003e\n"],["\u003cp\u003eCloud Composer uses Directed Acyclic Graphs (DAGs) and Cloud Data Fusion Operators to define and manage pipeline orchestration, including starting, stopping, and deleting pipelines.\u003c/p\u003e\n"],["\u003cp\u003eTriggers in Cloud Data Fusion allow automatic execution of downstream pipelines upon completion of upstream pipelines, based on success, failure, or other conditions.\u003c/p\u003e\n"],["\u003cp\u003eTriggers facilitate dynamic pipelines by enabling the sharing of runtime arguments and plugin configurations (payload configuration) between upstream and downstream pipelines.\u003c/p\u003e\n"],["\u003cp\u003eUsing payload configuration with triggers, the downstream pipeline can receive information, such as output directory and data format, from the upstream pipeline.\u003c/p\u003e\n"]]],[],null,["# Orchestrate pipelines\n\nThis page explains pipeline orchestration with Cloud Composer and\ntriggers. Cloud Data Fusion recommends using Cloud Composer to\norchestrate pipelines. If you require a simpler way to manage orchestration, use\ntriggers. \n\n### Composer\n\nOrchestrate pipelines with Cloud Composer\n-----------------------------------------\n\nOrchestrating pipeline execution in Cloud Data Fusion with\nCloud Composer provides following benefits:\n\n- **Centralized workflow management:** uniformly manage the execution of multiple Cloud Data Fusion pipelines.\n- **Dependency management:** to ensure proper execution order, define dependencies between pipelines.\n- **Monitoring and alerting:** Cloud Composer provides monitoring capabilities and alerts for failures.\n- **Integration with other services:** Cloud Composer lets you orchestrate workflows that span across Cloud Data Fusion and other Google Cloud services.\n\nTo orchestrate Cloud Data Fusion pipelines using\nCloud Composer, follow this process:\n\n1. **Set up the Cloud Composer environment.**\n\n - **Create a Cloud Composer environment.** If you don't have one, provision the environment in your Google Cloud project. This environment is your orchestration workspace.\n - **Give permissions.** Ensure the Cloud Composer service account has the necessary permissions to access Cloud Data Fusion (such as permission to start, stop, and list pipelines).\n2. **Define Directed Acyclic Graphs (DAG) for orchestration.**\n\n - **Create a DAG:** In Cloud Composer, create a DAG that defines the orchestration workflow for your Cloud Data Fusion pipelines.\n - **Cloud Data Fusion Operators:** Use Cloud Composer's Cloud Data Fusion Operators within your DAG. These operators let you interact programmatically with Cloud Data Fusion.\n\n### Cloud Data Fusion operators\n\nCloud Data Fusion pipeline orchestration has the following operators:\n\n`CloudDataFusionStartPipelineOperator`\n\n: Triggers the execution of a Cloud Data Fusion pipeline by its ID. It\n has the following parameters:\n\n - Pipeline ID\n - Location (Google Cloud region)\n - Pipeline namespace\n - Runtime arguments (optional)\n - Wait for completion (optional)\n - Timeout (optional)\n\n`CloudDataFusionStopPipelineOperator`\n\n: Lets you stop a running Cloud Data Fusion pipeline.\n\n`CloudDataFusionDeletePipelineOperator`\n\n: Deletes a Cloud Data Fusion pipeline.\n\n### Build the DAG workflow\n\nWhen you build the DAG workflow, consider the following:\n\n- **Defining dependencies:** Use the DAG structure to define dependencies between tasks. For example, you might have a task that waits for a pipeline in one namespace to complete successfully before triggering another pipeline in a different namespace.\n- **Scheduling:** Schedule the DAG to run at specific intervals, such as daily or hourly, or set it to be triggered manually.\n\nFor more information, see the\n[Cloud Composer overview](/composer/docs/concepts/overview).\n\n### Triggers\n\nOrchestrate pipelines with triggers\n-----------------------------------\n\nCloud Data Fusion triggers let you automatically execute a downstream\npipeline upon the completion (success, failure, or any specified condition)\nof one or more upstream pipelines.\n\nTriggers are useful for the following tasks:\n\n- Cleaning your data once, and then making it available to multiple downstream pipelines for consumption.\n- Sharing information, such as runtime arguments and plugin configurations, between pipelines. This task is called *payload\n configuration*.\n- Having a set of dynamic pipelines that run using the data from the hour, day, week, or month, instead of a static pipeline that must be updated for every run.\n\nFor example, you have a dataset that contains all information about your\ncompany's shipments. Based on this data, you want to answer several business\nquestions. To do this, you create one pipeline that cleanses the raw data\nabout shipments, called *Shipments Data Cleaning* . Then you create a second\npipeline, *Delayed Shipments USA* , which reads the cleansed data and finds\nthe shipments within the USA that were delayed by more than a specified\nthreshold. The *Delayed Shipments USA* pipeline can be triggered as soon as\nthe upstream *Shipments Data Cleaning* pipeline successfully completes.\n\nAdditionally, since the downstream pipeline consumes the output of the\nupstream pipeline, you must specify that when the downstream pipeline runs\nusing this trigger, it also receives the input directory to read from (which\nis the directory where the upstream pipeline generated its output). This\nprocess is called *passing payload configuration*, which you define with\nruntime arguments. It lets you have a set of dynamic pipelines that\nrun using the data of the hour, day, week, or month (not a static pipeline,\nwhich must be updated for every run).\n| **Note:** Don't trigger upgrades with Terraform. For more information, see the [limitations for Cloud Data Fusion upgrades](/data-fusion/docs/how-to/upgrading#limitations).\n\nTo orchestrate pipelines with triggers, follow this process:\n\n1. **Create upstream and downstream pipelines.**\n\n - In the Cloud Data Fusion Studio, design and deploy the pipelines that form your orchestration chain.\n - Consider which pipeline's completion will activate the next pipeline (downstream) in your workflow.\n2. **Optional: pass runtime arguments for upstream pipelines.**\n\n - If you need to [pass payload configuration as runtime arguments](#pass-payload-configs) between pipelines, configure runtime arguments. These arguments can be passed to the downstream pipeline during execution.\n3. **Create an inbound trigger on the downstream pipeline.**\n\n - In the Cloud Data Fusion Studio, go to the **List** page. In the **Deployed** tab, click the name of the downstream pipeline. The Deploy view for that pipeline appears.\n - On the middle left side of the page, click **Inbound triggers**. A list of available pipelines appears.\n - Click the upstream pipeline. Select one or more of the upstream pipeline completion states (**Succeeds** , **Fails** , or **Stops**) as the condition for when the downstream pipeline should run.\n - If you want the upstream pipeline to share information (called *payload configuration* ) with the downstream pipeline, click **Trigger config** and then follow the steps to [pass payload configuration as runtime arguments](#pass-payload-configs). Otherwise, click **Enable trigger**.\n4. **Test the trigger.**\n\n - Initiate a run of the upstream pipeline.\n - If the trigger is configured correctly, the downstream pipeline automatically executes upon completion of the upstream pipelines, based on your configured condition.\n\n### Pass payload configuration as runtime arguments\n\nPayload configuration allows sharing of information from the upstream\npipeline to the downstream pipeline. This information can be, for example,\nthe output directory, the data format, or the day the pipeline was run. This\ninformation is then used by the downstream pipeline for decisions such as\ndetermining the right dataset to read from.\n\nTo pass information from the upstream pipeline to the downstream pipeline,\nyou set the runtime arguments of the downstream pipeline with the values of\neither the runtime arguments or the configuration of any plugin in the\nupstream pipeline.\n\nWhenever the downstream pipeline triggers and runs, its payload\nconfiguration is set using the runtime arguments of the particular run of\nthe upstream pipeline that triggered the downstream pipeline.\n\nTo pass payload configuration as runtime arguments, follow these steps:\n\n1. Picking up where you left off in the [Creating an inbound trigger](/data-fusion/docs/how-to/using-triggers#create_inbound_trigger), after clicking **Trigger config** , any runtime arguments you [previously set](/data-fusion/docs/how-to/using-triggers#before_you_begin) for your upstream pipeline will appear. Choose the runtime arguments to pass from the upstream pipeline to the downstream pipeline when this trigger executes.\n2. Click the **Plugin config** tab to see a list of what will be passed from your upstream pipeline to your downstream pipeline when it is triggered.\n3. Click **Configure and Enable Trigger**."]]