Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
En esta página, se explica la orquestación de canalización con Cloud Composer y los activadores. Cloud Data Fusion recomienda usar Cloud Composer para orquestar canalizaciones. Si necesitas una forma más sencilla de administrar la orquestación, usa
activadores.
Compositor
Organiza canalizaciones con Cloud Composer
La orquestación de la ejecución de canalizaciones en Cloud Data Fusion con
Cloud Composer proporciona los siguientes beneficios:
Administración centralizada de flujos de trabajo: Administra de forma uniforme la ejecución de varias canalizaciones de Cloud Data Fusion.
Administración de dependencias: Para garantizar el orden de ejecución correcto, define las dependencias entre las canalizaciones.
Supervisión y alertas: Cloud Composer proporciona funciones de supervisión y alertas para fallas.
Integración con otros servicios: Cloud Composer te permite
organizar flujos de trabajo que abarcan Cloud Data Fusion y otros
servicios deGoogle Cloud .
Para organizar las canalizaciones de Cloud Data Fusion con
Cloud Composer, sigue este proceso:
Configura el entorno de Cloud Composer.
Crea un entorno de Cloud Composer. Si no tienes uno, aprovisiona el entorno en tu Google Cloud proyecto.
Este entorno es tu lugar de trabajo de orquestación.
Otorga permisos. Asegúrate de que la cuenta de servicio de Cloud Composer tenga los permisos necesarios para acceder a Cloud Data Fusion (como el permiso para iniciar, detener y enumerar canalizaciones).
Define grafos acíclicos dirigidos (DAG) para la orquestación.
Crea un DAG: En Cloud Composer, crea un DAG que
defina el flujo de trabajo de orquestación para tus canalizaciones de Cloud Data Fusion.
Operadores de Cloud Data Fusion: Usa los operadores de Cloud Data Fusion de Cloud Composer en tu DAG. Estos operadores te permiten interactuar de manera programática con Cloud Data Fusion.
Operadores de Cloud Data Fusion
La orquestación de canalizaciones de Cloud Data Fusion tiene los siguientes operadores:
CloudDataFusionStartPipelineOperator
Activa la ejecución de una canalización de Cloud Data Fusion por su ID. Tiene los siguientes parámetros:
ID de la canalización
Ubicación (regiónGoogle Cloud )
Espacio de nombres de la canalización
Argumentos del entorno de ejecución (opcional)
Espera a que se complete (opcional)
Tiempo de espera (opcional)
CloudDataFusionStopPipelineOperator
Te permite detener una canalización de Cloud Data Fusion en ejecución.
CloudDataFusionDeletePipelineOperator
Borra una canalización de Cloud Data Fusion.
Compila el flujo de trabajo de DAG
Cuando crees el flujo de trabajo de DAG, ten en cuenta lo siguiente:
Definir dependencias: Usa la estructura de DAG para definir las dependencias entre tareas. Por ejemplo, puedes tener una tarea que espere a que una canalización en un espacio de nombres se complete correctamente antes de activar otra canalización en un espacio de nombres diferente.
Programación: Programa el DAG para que se ejecute en intervalos específicos, como a diario o por hora, o configúralo para que se active de forma manual.
Los activadores de Cloud Data Fusion te permiten ejecutar automáticamente una canalización descendente cuando se completa una o más canalizaciones ascendentes (con éxito, error o cualquier condición especificada).
Los activadores son útiles para las siguientes tareas:
Limpiar tus datos una vez y hacer que estén disponibles para varias canalizaciones descendentes a fin de consumirlos.
Compartir información, como argumentos de entorno de ejecución y configuraciones de complementos, entre canalizaciones. Esta tarea se denomina configuración de carga útil.
Tener un conjunto de canalizaciones dinámicas que se ejecuten con los datos de la hora, el día, la semana o el mes, en lugar de una canalización estática que se debe actualizar para cada ejecución.
Por ejemplo, tienes un conjunto de datos que contiene toda la información sobre los envíos de tu
empresa. En función de estos datos, quieres responder varias preguntas
empresariales. Para ello, creas una canalización que limpia los datos sin procesar
sobre los envíos, llamada Limpieza de datos de envíos. Luego, creas una segunda canalización, Delayed Shipments USA, que lee los datos limpios y encuentra los envíos dentro de EE.UU. que se retrasaron por más de un límite especificado. La canalización de envíos retrasados de EE.UU. se puede activar en cuanto se complete correctamente la canalización de limpieza de datos de envíos upstream.
Además, como la canalización descendente consume el resultado de la canalización ascendente, debes especificar que, cuando se ejecuta la canalización descendente con este activador, también recibe el directorio de entrada del que se debe leer (que es el directorio en el que la canalización ascendente generó su resultado). Este proceso se denomina pasar la configuración de la carga útil, que defines con argumentos de tiempo de ejecución. Te permite tener un conjunto de canalizaciones dinámicas que se ejecutan con los datos de la hora, el día, la semana o el mes (no una canalización estática, que se debe actualizar para cada ejecución).
Para orquestar canalización con activadores, sigue este proceso:
Crea canalizaciones ascendentes y descendentes.
En Cloud Data Fusion Studio, diseña e implementa las
canalizaciones que forman tu cadena de orquestación.
Considera qué canalización completará la siguiente
canalización (aguas abajo) en tu flujo de trabajo.
Opcional: Pasa argumentos de tiempo de ejecución para las canalizaciones upstream.
Crea un activador entrante en la canalización descendente.
En Studio de Cloud Data Fusion, ve a la página List. En la pestaña Implementada, haz clic en el nombre de la canalización descendente. Aparecerá la
vista Implementar de esa canalización.
En el lado medio izquierdo de la página, haz clic en Activadores entrantes.
Aparecerá una lista de las canalizaciones disponibles.
Haz clic en la canalización ascendente. Selecciona uno o más de los estados de finalización de la canalización ascendente (Succeeds, Fails o Stops) como la condición para cuándo se debe ejecutar la canalización descendente.
Si deseas que la canalización ascendente comparta información (llamada configuración de la carga útil) con la canalización descendente, haz clic en Trigger config y, luego, sigue los pasos para pasar la configuración de la carga útil como argumentos de entorno de ejecución.
De lo contrario, haz clic en Habilitar activador.
Prueba el activador.
Inicia una ejecución de la canalización ascendente.
Si el activador se configuró de forma correcta, la canalización downstream
se ejecutará automáticamente cuando se completen las canalizaciones upstream,
según la condición que hayas configurado.
Pasa la configuración de la carga útil como argumentos del entorno de ejecución
La configuración de la carga útil permite compartir información de la canalización ascendente con la canalización descendente. Esta información puede ser, por ejemplo,
el directorio de salida, el formato de datos o el día en que se ejecutó la canalización. Luego, la canalización descendente usa esta información para tomar decisiones, como determinar el conjunto de datos correcto del que se debe leer.
Para pasar información de la canalización ascendente a la canalización descendente, debes configurar los argumentos del entorno de ejecución de la canalización descendente con los valores de los argumentos del entorno de ejecución o la configuración de cualquier complemento en la canalización ascendente.
Cada vez que se activa y ejecuta la canalización descendente, su configuración de carga útil se establece con los argumentos del entorno de ejecución de la ejecución en particular de la canalización ascendente que activó la canalización descendente.
Para pasar la configuración de la carga útil como argumentos del entorno de ejecución, sigue estos pasos:
Siguiendo desde donde quedaste en Cómo crear un activador entrante, después de hacer clic en Configuración del activador, aparecerán todos los argumentos de entorno de ejecución que configuraste anteriormente para tu canalización ascendente. Elige los argumentos de tiempo de ejecución que se pasarán de la canalización ascendente a la canalización descendente cuando se ejecute este activador.
Haz clic en la pestaña Configuración del complemento para ver una lista de lo que se pasará de la canalización ascendente a la canalización descendente cuando se active.
[[["Fácil de comprender","easyToUnderstand","thumb-up"],["Resolvió mi problema","solvedMyProblem","thumb-up"],["Otro","otherUp","thumb-up"]],[["Difícil de entender","hardToUnderstand","thumb-down"],["Información o código de muestra incorrectos","incorrectInformationOrSampleCode","thumb-down"],["Faltan la información o los ejemplos que necesito","missingTheInformationSamplesINeed","thumb-down"],["Problema de traducción","translationIssue","thumb-down"],["Otro","otherDown","thumb-down"]],["Última actualización: 2025-09-04 (UTC)"],[[["\u003cp\u003eCloud Composer can orchestrate multiple Cloud Data Fusion pipelines, offering centralized workflow and dependency management, monitoring, alerting, and integration with other Google Cloud services.\u003c/p\u003e\n"],["\u003cp\u003eCloud Composer uses Directed Acyclic Graphs (DAGs) and Cloud Data Fusion Operators to define and manage pipeline orchestration, including starting, stopping, and deleting pipelines.\u003c/p\u003e\n"],["\u003cp\u003eTriggers in Cloud Data Fusion allow automatic execution of downstream pipelines upon completion of upstream pipelines, based on success, failure, or other conditions.\u003c/p\u003e\n"],["\u003cp\u003eTriggers facilitate dynamic pipelines by enabling the sharing of runtime arguments and plugin configurations (payload configuration) between upstream and downstream pipelines.\u003c/p\u003e\n"],["\u003cp\u003eUsing payload configuration with triggers, the downstream pipeline can receive information, such as output directory and data format, from the upstream pipeline.\u003c/p\u003e\n"]]],[],null,["# Orchestrate pipelines\n\nThis page explains pipeline orchestration with Cloud Composer and\ntriggers. Cloud Data Fusion recommends using Cloud Composer to\norchestrate pipelines. If you require a simpler way to manage orchestration, use\ntriggers. \n\n### Composer\n\nOrchestrate pipelines with Cloud Composer\n-----------------------------------------\n\nOrchestrating pipeline execution in Cloud Data Fusion with\nCloud Composer provides following benefits:\n\n- **Centralized workflow management:** uniformly manage the execution of multiple Cloud Data Fusion pipelines.\n- **Dependency management:** to ensure proper execution order, define dependencies between pipelines.\n- **Monitoring and alerting:** Cloud Composer provides monitoring capabilities and alerts for failures.\n- **Integration with other services:** Cloud Composer lets you orchestrate workflows that span across Cloud Data Fusion and other Google Cloud services.\n\nTo orchestrate Cloud Data Fusion pipelines using\nCloud Composer, follow this process:\n\n1. **Set up the Cloud Composer environment.**\n\n - **Create a Cloud Composer environment.** If you don't have one, provision the environment in your Google Cloud project. This environment is your orchestration workspace.\n - **Give permissions.** Ensure the Cloud Composer service account has the necessary permissions to access Cloud Data Fusion (such as permission to start, stop, and list pipelines).\n2. **Define Directed Acyclic Graphs (DAG) for orchestration.**\n\n - **Create a DAG:** In Cloud Composer, create a DAG that defines the orchestration workflow for your Cloud Data Fusion pipelines.\n - **Cloud Data Fusion Operators:** Use Cloud Composer's Cloud Data Fusion Operators within your DAG. These operators let you interact programmatically with Cloud Data Fusion.\n\n### Cloud Data Fusion operators\n\nCloud Data Fusion pipeline orchestration has the following operators:\n\n`CloudDataFusionStartPipelineOperator`\n\n: Triggers the execution of a Cloud Data Fusion pipeline by its ID. It\n has the following parameters:\n\n - Pipeline ID\n - Location (Google Cloud region)\n - Pipeline namespace\n - Runtime arguments (optional)\n - Wait for completion (optional)\n - Timeout (optional)\n\n`CloudDataFusionStopPipelineOperator`\n\n: Lets you stop a running Cloud Data Fusion pipeline.\n\n`CloudDataFusionDeletePipelineOperator`\n\n: Deletes a Cloud Data Fusion pipeline.\n\n### Build the DAG workflow\n\nWhen you build the DAG workflow, consider the following:\n\n- **Defining dependencies:** Use the DAG structure to define dependencies between tasks. For example, you might have a task that waits for a pipeline in one namespace to complete successfully before triggering another pipeline in a different namespace.\n- **Scheduling:** Schedule the DAG to run at specific intervals, such as daily or hourly, or set it to be triggered manually.\n\nFor more information, see the\n[Cloud Composer overview](/composer/docs/concepts/overview).\n\n### Triggers\n\nOrchestrate pipelines with triggers\n-----------------------------------\n\nCloud Data Fusion triggers let you automatically execute a downstream\npipeline upon the completion (success, failure, or any specified condition)\nof one or more upstream pipelines.\n\nTriggers are useful for the following tasks:\n\n- Cleaning your data once, and then making it available to multiple downstream pipelines for consumption.\n- Sharing information, such as runtime arguments and plugin configurations, between pipelines. This task is called *payload\n configuration*.\n- Having a set of dynamic pipelines that run using the data from the hour, day, week, or month, instead of a static pipeline that must be updated for every run.\n\nFor example, you have a dataset that contains all information about your\ncompany's shipments. Based on this data, you want to answer several business\nquestions. To do this, you create one pipeline that cleanses the raw data\nabout shipments, called *Shipments Data Cleaning* . Then you create a second\npipeline, *Delayed Shipments USA* , which reads the cleansed data and finds\nthe shipments within the USA that were delayed by more than a specified\nthreshold. The *Delayed Shipments USA* pipeline can be triggered as soon as\nthe upstream *Shipments Data Cleaning* pipeline successfully completes.\n\nAdditionally, since the downstream pipeline consumes the output of the\nupstream pipeline, you must specify that when the downstream pipeline runs\nusing this trigger, it also receives the input directory to read from (which\nis the directory where the upstream pipeline generated its output). This\nprocess is called *passing payload configuration*, which you define with\nruntime arguments. It lets you have a set of dynamic pipelines that\nrun using the data of the hour, day, week, or month (not a static pipeline,\nwhich must be updated for every run).\n| **Note:** Don't trigger upgrades with Terraform. For more information, see the [limitations for Cloud Data Fusion upgrades](/data-fusion/docs/how-to/upgrading#limitations).\n\nTo orchestrate pipelines with triggers, follow this process:\n\n1. **Create upstream and downstream pipelines.**\n\n - In the Cloud Data Fusion Studio, design and deploy the pipelines that form your orchestration chain.\n - Consider which pipeline's completion will activate the next pipeline (downstream) in your workflow.\n2. **Optional: pass runtime arguments for upstream pipelines.**\n\n - If you need to [pass payload configuration as runtime arguments](#pass-payload-configs) between pipelines, configure runtime arguments. These arguments can be passed to the downstream pipeline during execution.\n3. **Create an inbound trigger on the downstream pipeline.**\n\n - In the Cloud Data Fusion Studio, go to the **List** page. In the **Deployed** tab, click the name of the downstream pipeline. The Deploy view for that pipeline appears.\n - On the middle left side of the page, click **Inbound triggers**. A list of available pipelines appears.\n - Click the upstream pipeline. Select one or more of the upstream pipeline completion states (**Succeeds** , **Fails** , or **Stops**) as the condition for when the downstream pipeline should run.\n - If you want the upstream pipeline to share information (called *payload configuration* ) with the downstream pipeline, click **Trigger config** and then follow the steps to [pass payload configuration as runtime arguments](#pass-payload-configs). Otherwise, click **Enable trigger**.\n4. **Test the trigger.**\n\n - Initiate a run of the upstream pipeline.\n - If the trigger is configured correctly, the downstream pipeline automatically executes upon completion of the upstream pipelines, based on your configured condition.\n\n### Pass payload configuration as runtime arguments\n\nPayload configuration allows sharing of information from the upstream\npipeline to the downstream pipeline. This information can be, for example,\nthe output directory, the data format, or the day the pipeline was run. This\ninformation is then used by the downstream pipeline for decisions such as\ndetermining the right dataset to read from.\n\nTo pass information from the upstream pipeline to the downstream pipeline,\nyou set the runtime arguments of the downstream pipeline with the values of\neither the runtime arguments or the configuration of any plugin in the\nupstream pipeline.\n\nWhenever the downstream pipeline triggers and runs, its payload\nconfiguration is set using the runtime arguments of the particular run of\nthe upstream pipeline that triggered the downstream pipeline.\n\nTo pass payload configuration as runtime arguments, follow these steps:\n\n1. Picking up where you left off in the [Creating an inbound trigger](/data-fusion/docs/how-to/using-triggers#create_inbound_trigger), after clicking **Trigger config** , any runtime arguments you [previously set](/data-fusion/docs/how-to/using-triggers#before_you_begin) for your upstream pipeline will appear. Choose the runtime arguments to pass from the upstream pipeline to the downstream pipeline when this trigger executes.\n2. Click the **Plugin config** tab to see a list of what will be passed from your upstream pipeline to your downstream pipeline when it is triggered.\n3. Click **Configure and Enable Trigger**."]]