Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Questa pagina spiega l'orchestrazione delle pipeline con Cloud Composer e gli attivatori. Cloud Data Fusion consiglia di utilizzare Cloud Composer per orchestrare le pipeline. Se hai bisogno di un modo più semplice per gestire l'orchestrazione, utilizza gli attivatori.
Composer
Orchestrare le pipeline con Cloud Composer
L'orchestrazione dell'esecuzione della pipeline in Cloud Data Fusion con Cloud Composer offre i seguenti vantaggi:
Gestione centralizzata del flusso di lavoro:gestisci in modo uniforme l'esecuzione di più pipeline Cloud Data Fusion.
Gestione delle dipendenze:per garantire l'ordine di esecuzione corretto, definisci le dipendenze tra le pipeline.
Monitoraggio e avvisi:Cloud Composer offre funzionalità di monitoraggio e avvisi per gli errori.
Integrazione con altri servizi:Cloud Composer consente di orchestrare flussi di lavoro che si estendono a Cloud Data Fusion e ad altri serviziGoogle Cloud .
Per orchestrare le pipeline Cloud Data Fusion utilizzando Cloud Composer, segui questa procedura:
Configura l'ambiente Cloud Composer.
Crea un ambiente Cloud Composer. Se non ne hai uno, esegui il provisioning dell'ambiente nel tuo Google Cloud progetto.
Questo ambiente è lo spazio di lavoro di orchestrazione.
Concedi le autorizzazioni. Assicurati che l'account di servizio Cloud Composer disponga delle autorizzazioni necessarie per accedere a Cloud Data Fusion (ad esempio l'autorizzazione per avviare, interrompere ed elencare le pipeline).
Definisci i grafici diretti aciclici (DAG) per l'orchestrazione.
Crea un DAG:in Cloud Composer, crea un DAG che definisce il flusso di lavoro di orchestrazione per le pipeline Cloud Data Fusion.
Operatori Cloud Data Fusion:utilizza gli operatori Cloud Data Fusion di Cloud Composer all'interno del tuo DAG. Questi operatori
ti consentono di interagire in modo programmatico con Cloud Data Fusion.
Operatori Cloud Data Fusion
L'orchestrazione delle pipeline di Cloud Data Fusion dispone dei seguenti operatori:
CloudDataFusionStartPipelineOperator
Attiva l'esecuzione di una pipeline Cloud Data Fusion in base al relativo ID. Ha i seguenti parametri:
ID pipeline
Località (Google Cloud regione)
Spazio dei nomi della pipeline
(Facoltativo) Argomenti di runtime
(Facoltativo) Attendi il completamento
Timeout (facoltativo)
CloudDataFusionStopPipelineOperator
Ti consente di interrompere una pipeline Cloud Data Fusion in esecuzione.
CloudDataFusionDeletePipelineOperator
Consente di eliminare una pipeline Cloud Data Fusion.
Crea il flusso di lavoro DAG
Quando crei il flusso di lavoro DAG, considera quanto segue:
Definizione delle dipendenze:utilizza la struttura DAG per definire le dipendenze tra le attività. Ad esempio, potresti avere un'attività che attende il completamento di una pipeline in uno spazio dei nomi prima di attivarne un'altra in uno spazio dei nomi diverso.
Pianificazione:pianifica l'esecuzione del DAG a intervalli specifici, ad esempio giornalieri o ogni ora, oppure impostalo in modo che venga attivato manualmente.
Gli attivatori Cloud Data Fusion ti consentono di eseguire automaticamente una
pipeline a valle al termine (successo, errore o qualsiasi condizione specificata)
di una o più pipeline a monte.
Gli attivatori sono utili per le seguenti attività:
Pulisci i dati una volta e poi mettili a disposizione di più
pipeline a valle per il consumo.
Condivisione di informazioni, ad esempio argomenti di runtime e configurazioni dei plug-in, tra le pipeline. Questa attività è chiamata configurazione del payload.
Avere un insieme di pipeline dinamiche che vengono eseguite utilizzando i dati dell'ora,
del giorno, della settimana o del mese, anziché una pipeline statica che deve essere aggiornata
per ogni esecuzione.
Ad esempio, hai un set di dati che contiene tutte le informazioni sulle spedizioni della tua azienda. In base a questi dati, vuoi rispondere a diverse domande sulla tua attività. A tale scopo, crea una pipeline che pulisca i dati non elaborati sulle spedizioni, chiamata Shipments Data Cleaning (Pulizia dei dati sulle spedizioni). Poi crei una seconda
pipeline, Spedizioni in ritardo negli Stati Uniti, che legge i dati ripuliti e trova
le spedizioni all'interno degli Stati Uniti che hanno subito ritardi superiori a una specificata
soglia. La pipeline Spedizioni in ritardo negli Stati Uniti può essere attivata non appena la pipeline Pulizia dei dati sulle spedizioni a monte viene completata correttamente.
Inoltre, poiché la pipeline downstream utilizza l'output della pipeline upstream, devi specificare che, quando viene eseguita utilizzando questo attivatore, riceve anche la directory di input da leggere (ovvero la directory in cui la pipeline upstream ha generato il proprio output). Questo processo è chiamato passaggio della configurazione del payload e viene definito con gli argomenti di runtime. Ti consente di avere un insieme di pipeline dinamiche che vengono eseguite utilizzando i dati dell'ora, del giorno, della settimana o del mese (non una pipeline statica, che deve essere aggiornata per ogni esecuzione).
Per orchestrare le pipeline con gli attivatori, segui questa procedura:
Crea pipeline a monte e a valle.
In Cloud Data Fusion Studio, progetta e esegui il deployment delle
pipeline che formano la catena di orchestrazione.
Valuta il completamento della pipeline che attiverà la pipeline successiva (a valle) nel flusso di lavoro.
(Facoltativo) Passa gli argomenti di runtime per le pipeline a monte.
Crea un trigger in entrata nella pipeline a valle.
In Studio di Cloud Data Fusion, vai alla pagina Elenco. Nella scheda Eseguito, fai clic sul nome della pipeline a valle. Viene visualizzata la
visualizzazione di Deployment per la pipeline.
Nella parte centrale sinistra della pagina, fai clic su Attivatori in entrata.
Viene visualizzato un elenco di pipeline disponibili.
Fai clic sulla pipeline a monte. Seleziona uno o più stati di completamento della pipeline a monte (Succeeds, Fails o Stops) come condizione per l'esecuzione della pipeline a valle.
Se vuoi che la pipeline a monte condivida informazioni (chiamate
configurazione del payload) con la pipeline a valle, fai clic su
Configurazione trigger e poi segui i passaggi per
trasmettere la configurazione del payload come argomenti di runtime.
In caso contrario, fai clic su Attiva trigger.
Testa l'attivatore.
Avvia un'esecuzione della pipeline a monte.
Se l'attivatore è configurato correttamente, la pipeline a valle viene eseguita automaticamente al termine delle pipeline a monte, in base alla condizione configurata.
Passare la configurazione del payload come argomenti di runtime
La configurazione del payload consente di condividere informazioni dalla pipeline di upstream alla pipeline di downstream. Queste informazioni possono essere, ad esempio, la directory di output, il formato dei dati o il giorno in cui è stata eseguita la pipeline. Queste informazioni vengono poi utilizzate dalla pipeline a valle per decisioni come la determinazione del set di dati corretto da leggere.
Per trasmettere le informazioni dalla pipeline a monte a quella a valle, imposta gli argomenti di runtime della pipeline a valle con i valori degli argomenti di runtime o della configurazione di qualsiasi plug-in nella pipeline a monte.
Ogni volta che la pipeline a valle viene attivata ed eseguita, la configurazione del suo payload viene impostata utilizzando gli argomenti di runtime della determinata esecuzione della pipeline a monte che ha attivato la pipeline a valle.
Per passare la configurazione del payload come argomenti di runtime:
Riprendi da dove avevi interrotto nella sezione Creazione di un trigger in entrata.
Dopo aver fatto clic su Configurazione trigger, verranno visualizzati tutti gli argomenti di runtime che hai
impostato in precedenza per la pipeline a monte. Scegli gli argomenti di runtime da passare dalla pipeline a monte alla pipeline a valle quando viene eseguito questo attivatore.
Fai clic sulla scheda Configurazione plug-in per visualizzare un elenco di ciò che verrà trasmesso dalla pipeline a monte alla pipeline a valle quando viene attivata.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-04 UTC."],[[["\u003cp\u003eCloud Composer can orchestrate multiple Cloud Data Fusion pipelines, offering centralized workflow and dependency management, monitoring, alerting, and integration with other Google Cloud services.\u003c/p\u003e\n"],["\u003cp\u003eCloud Composer uses Directed Acyclic Graphs (DAGs) and Cloud Data Fusion Operators to define and manage pipeline orchestration, including starting, stopping, and deleting pipelines.\u003c/p\u003e\n"],["\u003cp\u003eTriggers in Cloud Data Fusion allow automatic execution of downstream pipelines upon completion of upstream pipelines, based on success, failure, or other conditions.\u003c/p\u003e\n"],["\u003cp\u003eTriggers facilitate dynamic pipelines by enabling the sharing of runtime arguments and plugin configurations (payload configuration) between upstream and downstream pipelines.\u003c/p\u003e\n"],["\u003cp\u003eUsing payload configuration with triggers, the downstream pipeline can receive information, such as output directory and data format, from the upstream pipeline.\u003c/p\u003e\n"]]],[],null,["# Orchestrate pipelines\n\nThis page explains pipeline orchestration with Cloud Composer and\ntriggers. Cloud Data Fusion recommends using Cloud Composer to\norchestrate pipelines. If you require a simpler way to manage orchestration, use\ntriggers. \n\n### Composer\n\nOrchestrate pipelines with Cloud Composer\n-----------------------------------------\n\nOrchestrating pipeline execution in Cloud Data Fusion with\nCloud Composer provides following benefits:\n\n- **Centralized workflow management:** uniformly manage the execution of multiple Cloud Data Fusion pipelines.\n- **Dependency management:** to ensure proper execution order, define dependencies between pipelines.\n- **Monitoring and alerting:** Cloud Composer provides monitoring capabilities and alerts for failures.\n- **Integration with other services:** Cloud Composer lets you orchestrate workflows that span across Cloud Data Fusion and other Google Cloud services.\n\nTo orchestrate Cloud Data Fusion pipelines using\nCloud Composer, follow this process:\n\n1. **Set up the Cloud Composer environment.**\n\n - **Create a Cloud Composer environment.** If you don't have one, provision the environment in your Google Cloud project. This environment is your orchestration workspace.\n - **Give permissions.** Ensure the Cloud Composer service account has the necessary permissions to access Cloud Data Fusion (such as permission to start, stop, and list pipelines).\n2. **Define Directed Acyclic Graphs (DAG) for orchestration.**\n\n - **Create a DAG:** In Cloud Composer, create a DAG that defines the orchestration workflow for your Cloud Data Fusion pipelines.\n - **Cloud Data Fusion Operators:** Use Cloud Composer's Cloud Data Fusion Operators within your DAG. These operators let you interact programmatically with Cloud Data Fusion.\n\n### Cloud Data Fusion operators\n\nCloud Data Fusion pipeline orchestration has the following operators:\n\n`CloudDataFusionStartPipelineOperator`\n\n: Triggers the execution of a Cloud Data Fusion pipeline by its ID. It\n has the following parameters:\n\n - Pipeline ID\n - Location (Google Cloud region)\n - Pipeline namespace\n - Runtime arguments (optional)\n - Wait for completion (optional)\n - Timeout (optional)\n\n`CloudDataFusionStopPipelineOperator`\n\n: Lets you stop a running Cloud Data Fusion pipeline.\n\n`CloudDataFusionDeletePipelineOperator`\n\n: Deletes a Cloud Data Fusion pipeline.\n\n### Build the DAG workflow\n\nWhen you build the DAG workflow, consider the following:\n\n- **Defining dependencies:** Use the DAG structure to define dependencies between tasks. For example, you might have a task that waits for a pipeline in one namespace to complete successfully before triggering another pipeline in a different namespace.\n- **Scheduling:** Schedule the DAG to run at specific intervals, such as daily or hourly, or set it to be triggered manually.\n\nFor more information, see the\n[Cloud Composer overview](/composer/docs/concepts/overview).\n\n### Triggers\n\nOrchestrate pipelines with triggers\n-----------------------------------\n\nCloud Data Fusion triggers let you automatically execute a downstream\npipeline upon the completion (success, failure, or any specified condition)\nof one or more upstream pipelines.\n\nTriggers are useful for the following tasks:\n\n- Cleaning your data once, and then making it available to multiple downstream pipelines for consumption.\n- Sharing information, such as runtime arguments and plugin configurations, between pipelines. This task is called *payload\n configuration*.\n- Having a set of dynamic pipelines that run using the data from the hour, day, week, or month, instead of a static pipeline that must be updated for every run.\n\nFor example, you have a dataset that contains all information about your\ncompany's shipments. Based on this data, you want to answer several business\nquestions. To do this, you create one pipeline that cleanses the raw data\nabout shipments, called *Shipments Data Cleaning* . Then you create a second\npipeline, *Delayed Shipments USA* , which reads the cleansed data and finds\nthe shipments within the USA that were delayed by more than a specified\nthreshold. The *Delayed Shipments USA* pipeline can be triggered as soon as\nthe upstream *Shipments Data Cleaning* pipeline successfully completes.\n\nAdditionally, since the downstream pipeline consumes the output of the\nupstream pipeline, you must specify that when the downstream pipeline runs\nusing this trigger, it also receives the input directory to read from (which\nis the directory where the upstream pipeline generated its output). This\nprocess is called *passing payload configuration*, which you define with\nruntime arguments. It lets you have a set of dynamic pipelines that\nrun using the data of the hour, day, week, or month (not a static pipeline,\nwhich must be updated for every run).\n| **Note:** Don't trigger upgrades with Terraform. For more information, see the [limitations for Cloud Data Fusion upgrades](/data-fusion/docs/how-to/upgrading#limitations).\n\nTo orchestrate pipelines with triggers, follow this process:\n\n1. **Create upstream and downstream pipelines.**\n\n - In the Cloud Data Fusion Studio, design and deploy the pipelines that form your orchestration chain.\n - Consider which pipeline's completion will activate the next pipeline (downstream) in your workflow.\n2. **Optional: pass runtime arguments for upstream pipelines.**\n\n - If you need to [pass payload configuration as runtime arguments](#pass-payload-configs) between pipelines, configure runtime arguments. These arguments can be passed to the downstream pipeline during execution.\n3. **Create an inbound trigger on the downstream pipeline.**\n\n - In the Cloud Data Fusion Studio, go to the **List** page. In the **Deployed** tab, click the name of the downstream pipeline. The Deploy view for that pipeline appears.\n - On the middle left side of the page, click **Inbound triggers**. A list of available pipelines appears.\n - Click the upstream pipeline. Select one or more of the upstream pipeline completion states (**Succeeds** , **Fails** , or **Stops**) as the condition for when the downstream pipeline should run.\n - If you want the upstream pipeline to share information (called *payload configuration* ) with the downstream pipeline, click **Trigger config** and then follow the steps to [pass payload configuration as runtime arguments](#pass-payload-configs). Otherwise, click **Enable trigger**.\n4. **Test the trigger.**\n\n - Initiate a run of the upstream pipeline.\n - If the trigger is configured correctly, the downstream pipeline automatically executes upon completion of the upstream pipelines, based on your configured condition.\n\n### Pass payload configuration as runtime arguments\n\nPayload configuration allows sharing of information from the upstream\npipeline to the downstream pipeline. This information can be, for example,\nthe output directory, the data format, or the day the pipeline was run. This\ninformation is then used by the downstream pipeline for decisions such as\ndetermining the right dataset to read from.\n\nTo pass information from the upstream pipeline to the downstream pipeline,\nyou set the runtime arguments of the downstream pipeline with the values of\neither the runtime arguments or the configuration of any plugin in the\nupstream pipeline.\n\nWhenever the downstream pipeline triggers and runs, its payload\nconfiguration is set using the runtime arguments of the particular run of\nthe upstream pipeline that triggered the downstream pipeline.\n\nTo pass payload configuration as runtime arguments, follow these steps:\n\n1. Picking up where you left off in the [Creating an inbound trigger](/data-fusion/docs/how-to/using-triggers#create_inbound_trigger), after clicking **Trigger config** , any runtime arguments you [previously set](/data-fusion/docs/how-to/using-triggers#before_you_begin) for your upstream pipeline will appear. Choose the runtime arguments to pass from the upstream pipeline to the downstream pipeline when this trigger executes.\n2. Click the **Plugin config** tab to see a list of what will be passed from your upstream pipeline to your downstream pipeline when it is triggered.\n3. Click **Configure and Enable Trigger**."]]