Cloud Composer 1 | Cloud Composer 2
En esta página, se describe cómo usar Cloud Functions para activar DAG en respuesta a eventos.
Airflow está diseñado para ejecutar DAG de forma periódica, pero también puedes activar DAG en respuesta a eventos. Una forma de hacerlo es usar Cloud Functions para activar los DAG de Cloud Composer cuando se produce un evento especificado. Por ejemplo, puedes crear una función que active un DAG cuando un objeto cambie en un bucket de Cloud Storage o cuando un mensaje se envíe a un tema de Pub/Sub.
En el ejemplo de esta guía, se ejecuta un DAG cada vez que se produce un cambio en un bucket de Cloud Storage. Los cambios en cualquier objeto de un bucket activan una función. Esta función realiza una solicitud a la API de REST de Airflow del entorno de Cloud Composer. Airflow procesa esta solicitud y ejecuta un DAG. El DAG genera información sobre el cambio.
Antes de comenzar
Habilita las API para tu proyecto.
Console
Habilita las API de Cloud Composer and Cloud Functions.
gcloud
Habilita las API de Cloud Composer and Cloud Functions.
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Habilita la API de REST de Airflow
Para Airflow 2, la API de REST estable ya está habilitada de forma predeterminada. Si tu entorno tiene inhabilitada la API estable, habilita la API estable de REST.
Cree un bucket de Cloud Storage
Este ejemplo activa un DAG en respuesta a los cambios en un bucket de Cloud Storage. Crea un bucket nuevo para usar en este ejemplo.
Obtén la URL del servidor web de Airflow
En este ejemplo, se realizan solicitudes a la API de REST al extremo del servidor web de Airflow. Usa la URL del servidor web de Airflow en tu código de Cloud Functions.
Console
En Google Cloud Console, ve a la página Entornos.
Haz clic en el nombre de tu entorno.
En la página Detalles del entorno, ve a la pestaña Detalles del entorno.
La URL del servidor web de Airflow aparece en el elemento IU web de Airflow.
gcloud
Ejecuta el siguiente comando:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Reemplaza lo siguiente:
ENVIRONMENT_NAME
por el nombre del entorno.LOCATION
por la región donde se encuentra el entorno
Activa un DAG desde Cloud Functions
Sube un DAG a tu entorno
Sube un DAG a tu entorno. En el siguiente ejemplo de DAG, se muestra la configuración de ejecución del DAG recibido. Debes activar este DAG desde una función que crearás más adelante en esta guía.
Implementa una función de Cloud Functions que active el DAG
Implementa una función de Cloud Functions de Python mediante los siguientes parámetros y contenido de configuración.
Especifica los parámetros de configuración de Cloud Functions
Activador Para este ejemplo, selecciona un activador que funcione cuando se cree un objeto nuevo en un bucket o se reemplace un objeto existente.
Tipo de activador Cloud Storage
Tipo de evento Finalizar/Crear
Bucket Selecciona un bucket que debe activar esta función.
Volver a intentar en caso de error Te recomendamos inhabilitar esta opción para los fines de este ejemplo. Si usas tu propia función en un entorno de producción, habilita esta opción para controlar errores transitorios.
Cuenta de servicio del entorno de ejecución, en la sección Entorno de ejecución, compilación, conexiones y configuración de seguridad. Usa una de las siguientes opciones, según tus preferencias:
Selecciona Cuenta de servicio predeterminada de Compute Engine. Con los permisos de IAM predeterminados, esta cuenta puede ejecutar funciones que acceden a entornos de Cloud Composer.
Crea una cuenta de servicio personalizada que tenga la función de usuario de Composer y especifícala como una cuenta de servicio del entorno de ejecución para esta función. Esta opción sigue el principio de privilegio mínimo.
Entorno de ejecución y punto de entrada, en el paso Código. Cuando agregues código para este ejemplo, selecciona el entorno de ejecución de Python 3.7 o superior, y especifica
trigger_dag_gcf
como el punto de entrada.
Agrega requisitos
Especifica las dependencias en el archivo requirements.txt
:
Agregar el código para activar los DAG con la API de REST de Airflow
Crea un archivo llamado composer2_airflow_rest_api.py
y coloca el código para realizar llamadas a la API de REST de Airflow en este archivo.
No cambies ninguna variable. La Cloud Function importa este archivo desde el archivo main.py
.
Agrega el código de la función de Cloud Functions
Ingresa el siguiente código en el archivo main.py
. Reemplaza el valor de la variable web_server_url
por la dirección del servidor web de Airflow que obtuviste antes.
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
Prueba la función
Para verificar que tu función y DAG funcionen según lo previsto, haz lo siguiente:
- Espera hasta que se implemente la función.
- Sube un archivo a tu bucket de Cloud Storage. Como alternativa, puedes activar la función de forma manual si seleccionas la acción Probar función en Google Cloud Console.
- Consulta la página del DAG en la interfaz web de Airflow. El DAG debe tener una DAG activa o ya completada.
- En la IU de Airflow, verifique los registros de tareas de esta ejecución. Deberías ver que la tarea
print_gcs_info
genera los datos recibidos de la función a los registros:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
¿Qué sigue?
- Acceda a la IU de Airflow
- Accede a la API de REST de Airflow
- Escribir DAG
- Escribe Cloud Functions
- Activadores de Cloud Storage