En esta página se explica la orquestación de la canalización con Cloud Composer y los activadores. Cloud Data Fusion recomienda usar Cloud Composer para orquestar flujos de procesamiento. Si necesitas una forma más sencilla de gestionar la orquestación, usa los activadores.
Composer
Orquestar flujos de procesamiento con Cloud Composer
Orquestar la ejecución de flujos de procesamiento en Cloud Data Fusion con Cloud Composer ofrece las siguientes ventajas:
- Gestión de flujos de trabajo centralizada: gestiona de forma uniforme la ejecución de varios flujos de procesamiento de Cloud Data Fusion.
- Gestión de dependencias: para asegurar el orden de ejecución adecuado, define las dependencias entre las canalizaciones.
- Monitorización y alertas: Cloud Composer ofrece funciones de monitorización y alertas por errores.
- Integración con otros servicios: Cloud Composer te permite orquestar flujos de trabajo que se extienden por Cloud Data Fusion y otros servicios deGoogle Cloud .
Para orquestar flujos de procesamiento de Cloud Data Fusion con Cloud Composer, sigue este proceso:
Configura el entorno de Cloud Composer.
- Crea un entorno de Cloud Composer. Si no tienes ninguno, aprovisiona el entorno en tu Google Cloud proyecto. Este entorno es tu espacio de trabajo de orquestación.
- Da permisos. Asegúrate de que la cuenta de servicio de Cloud Composer tenga los permisos necesarios para acceder a Cloud Data Fusion (por ejemplo, permisos para iniciar, detener y enumerar las canalizaciones).
Define grafos acíclicos dirigidos (DAG) para la orquestación.
- Crea un DAG: en Cloud Composer, crea un DAG que defina el flujo de trabajo de orquestación de tus flujos de procesamiento de Cloud Data Fusion.
- Operadores de Cloud Data Fusion: usa los operadores de Cloud Data Fusion de Cloud Composer en tu DAG. Estos operadores te permiten interactuar de forma programática con Cloud Data Fusion.
Operadores de Cloud Data Fusion
La orquestación de flujos de procesamiento de Cloud Data Fusion tiene los siguientes operadores:
CloudDataFusionStartPipelineOperator
Activa la ejecución de un flujo de procesamiento de Cloud Data Fusion por su ID. Tiene los siguientes parámetros:
- ID de flujo de procesamiento
- Ubicación (Google Cloud región)
- Espacio de nombres del flujo de procesamiento
- Argumentos de tiempo de ejecución (opcional)
- Esperar a que se complete (opcional)
- Tiempo de espera (opcional)
CloudDataFusionStopPipelineOperator
Te permite detener un flujo de procesamiento de Cloud Data Fusion en ejecución.
CloudDataFusionDeletePipelineOperator
Elimina un flujo de procesamiento de Cloud Data Fusion.
Crear el flujo de trabajo de DAG
Cuando crees el flujo de trabajo del DAG, ten en cuenta lo siguiente:
- Definir dependencias: usa la estructura de DAG para definir las dependencias entre tareas. Por ejemplo, puede que tengas una tarea que espere a que se complete correctamente una canalización en un espacio de nombres antes de activar otra canalización en un espacio de nombres diferente.
- Programación: programa el DAG para que se ejecute a intervalos específicos, como cada día o cada hora, o configúralo para que se active manualmente.
Para obtener más información, consulta la descripción general de Cloud Composer.
Activadores
Orquestar flujos de procesamiento con activadores
Los activadores de Cloud Data Fusion te permiten ejecutar automáticamente un flujo de procesamiento posterior cuando se completa (correctamente, con errores o con cualquier condición especificada) uno o varios flujos de procesamiento anteriores.
Los activadores son útiles para las siguientes tareas:
- Limpiar los datos una vez y, después, ponerlos a disposición de varios flujos de procesamiento posteriores para que los usen.
- Compartir información entre las canalizaciones, como argumentos de tiempo de ejecución y configuraciones de complementos. Esta tarea se denomina configuración de la carga útil.
- Tener un conjunto de canalizaciones dinámicas que se ejecuten con los datos de la hora, el día, la semana o el mes, en lugar de una canalización estática que deba actualizarse en cada ejecución.
Por ejemplo, tienes un conjunto de datos que contiene toda la información sobre los envíos de tu empresa. A partir de estos datos, quieres responder a varias preguntas empresariales. Para ello, creará un flujo de procesamiento que limpie los datos sin procesar sobre los envíos, llamado Limpieza de datos de envíos. A continuación, crea una segunda canalización, Envíos retrasados de EE. UU., que lee los datos depurados y busca los envíos de EE. UU. que se hayan retrasado más de un umbral especificado. El flujo de procesamiento Delayed Shipments USA se puede activar en cuanto se complete correctamente el flujo de procesamiento Shipments Data Cleaning.
Además, como el flujo de procesamiento de datos posterior consume la salida del flujo de procesamiento de datos anterior, debes especificar que, cuando se ejecute el flujo de procesamiento de datos posterior con este activador, también reciba el directorio de entrada del que debe leer (que es el directorio en el que el flujo de procesamiento de datos anterior ha generado su salida). Este proceso se denomina transferencia de configuración de carga útil, que se define con argumentos de tiempo de ejecución. Te permite tener un conjunto de canalizaciones dinámicas que se ejecutan con los datos de la hora, el día, la semana o el mes (no una canalización estática, que debe actualizarse en cada ejecución).
Para orquestar las canalizaciones con activadores, sigue este proceso:
Crea flujos de procesamiento ascendentes y descendentes.
- En Cloud Data Fusion Studio, diseña y despliega los flujos de procesamiento que forman tu cadena de orquestación.
- Ten en cuenta qué canalización activará la siguiente (la que está más abajo) en tu flujo de trabajo.
Opcional: transfiere argumentos de tiempo de ejecución a las canalizaciones de origen.
- Si necesitas transferir la configuración de la carga útil como argumentos de tiempo de ejecución entre las canalizaciones, configura los argumentos de tiempo de ejecución. Estos argumentos se pueden transferir a la canalización de nivel inferior durante la ejecución.
Crea un activador de entrada en el flujo de procesamiento de nivel inferior.
- En Cloud Data Fusion Studio, ve a la página Lista. En la pestaña Implementado, haz clic en el nombre de la canalización de nivel inferior. Se muestra la vista de implementación de esa canalización.
- En la parte central izquierda de la página, haz clic en Activadores de entrada. Aparecerá una lista de las canalizaciones disponibles.
- Haz clic en la canalización anterior. Seleccione uno o varios estados de finalización de la canalización de origen (Correcto, Fallo o Detenido) como condición para que se ejecute la canalización de destino.
- Si quieres que la canalización upstream comparta información (llamada configuración de carga útil) con la canalización downstream, haz clic en Configuración de activación y, a continuación, sigue los pasos para transferir la configuración de carga útil como argumentos de tiempo de ejecución. De lo contrario, haz clic en Habilitar activador.
Prueba el activador.
- Inicia una ejecución de la canalización upstream.
- Si el activador está configurado correctamente, la canalización de nivel inferior se ejecutará automáticamente cuando se completen las canalizaciones de nivel superior, en función de la condición que hayas configurado.
Transferir la configuración de la carga útil como argumentos del tiempo de ejecución
La configuración de la carga útil permite compartir información de la canalización upstream con la canalización downstream. Esta información puede ser, por ejemplo, el directorio de salida, el formato de los datos o el día en que se ejecutó la canalización. La canalización de nivel inferior usa esta información para tomar decisiones, como determinar el conjunto de datos adecuado del que leer.
Para transferir información del flujo de procesamiento ascendente al descendente, define los argumentos del entorno de ejecución del flujo de procesamiento descendente con los valores de los argumentos del entorno de ejecución o de la configuración de cualquier complemento del flujo de procesamiento ascendente.
Cada vez que se activa y se ejecuta la canalización de nivel inferior, su configuración de carga útil se define mediante los argumentos de tiempo de ejecución de la ejecución concreta de la canalización de nivel superior que ha activado la canalización de nivel inferior.
Para transferir la configuración de la carga útil como argumentos de tiempo de ejecución, sigue estos pasos:
- Siguiendo los pasos de la sección Crear un activador de entrada, después de hacer clic en Configuración del activador, aparecerán los argumentos de tiempo de ejecución que hayas definido anteriormente para tu canalización upstream. Elige los argumentos de tiempo de ejecución que se van a transferir de la canalización anterior a la posterior cuando se ejecute este activador.
- Haga clic en la pestaña Configuración del complemento para ver una lista de lo que se transferirá de su canalización de origen a su canalización de destino cuando se active.
- Haga clic en Configurar y habilitar activador.