Activa DAG con Cloud Functions

Cloud Composer 1 | Cloud Composer 2

En esta página, se describe cómo usar Cloud Functions para activar DAG en respuesta a eventos.

Airflow está diseñado para ejecutar DAG de forma periódica, pero también puedes activar DAG en respuesta a eventos. Una forma de hacerlo es usar Cloud Functions para activar los DAG de Cloud Composer cuando se produce un evento especificado. Por ejemplo, puedes crear una función que active un DAG cuando un objeto cambie en un bucket de Cloud Storage o cuando un mensaje se envíe a un tema de Pub/Sub.

En el ejemplo de esta guía, se ejecuta un DAG cada vez que se produce un cambio en un bucket de Cloud Storage. Los cambios en cualquier objeto de un bucket activan una función. Esta función realiza una solicitud a la API de REST de Airflow del entorno de Cloud Composer. Airflow procesa esta solicitud y ejecuta un DAG. El DAG genera información sobre el cambio.

Antes de comenzar

Habilita las API para tu proyecto.

Console

  • Habilita las API de Cloud Composer and Cloud Functions.

    Habilita las API

  • gcloud

  • Habilita las API de Cloud Composer and Cloud Functions.

    gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
  • Habilita la API de REST de Airflow

    Para Airflow 2, la API de REST estable ya está habilitada de forma predeterminada. Si tu entorno tiene inhabilitada la API estable, habilita la API estable de REST.

    Cree un bucket de Cloud Storage

    Este ejemplo activa un DAG en respuesta a los cambios en un bucket de Cloud Storage. Crea un bucket nuevo para usar en este ejemplo.

    Obtén la URL del servidor web de Airflow

    En este ejemplo, se realizan solicitudes a la API de REST al extremo del servidor web de Airflow. Usa la URL del servidor web de Airflow en tu código de Cloud Functions.

    Console

    1. En Google Cloud Console, ve a la página Entornos.

      Ir a Entornos

    2. Haz clic en el nombre de tu entorno.

    3. En la página Detalles del entorno, ve a la pestaña Detalles del entorno.

    4. La URL del servidor web de Airflow aparece en el elemento IU web de Airflow.

    gcloud

    Ejecuta el siguiente comando:

    gcloud composer environments describe ENVIRONMENT_NAME \
        --location LOCATION \
        --format='value(config.airflowUri)'
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME por el nombre del entorno.
    • LOCATION por la región donde se encuentra el entorno

    Activa un DAG desde Cloud Functions

    Sube un DAG a tu entorno

    Sube un DAG a tu entorno. En el siguiente ejemplo de DAG, se muestra la configuración de ejecución del DAG recibido. Debes activar este DAG desde una función que crearás más adelante en esta guía.

    import datetime
    
    import airflow
    from airflow.operators.bash_operator import BashOperator
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            start_date=datetime.datetime(2021, 1, 1),
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    Implementa una función de Cloud Functions que active el DAG

    Implementa una función de Cloud Functions de Python mediante los siguientes parámetros y contenido de configuración.

    Especifica los parámetros de configuración de Cloud Functions

    • Activador Para este ejemplo, selecciona un activador que funcione cuando se cree un objeto nuevo en un bucket o se reemplace un objeto existente.

      • Tipo de activador Cloud Storage

      • Tipo de evento Finalizar/Crear

      • Bucket Selecciona un bucket que debe activar esta función.

      • Volver a intentar en caso de error Te recomendamos inhabilitar esta opción para los fines de este ejemplo. Si usas tu propia función en un entorno de producción, habilita esta opción para controlar errores transitorios.

    • Cuenta de servicio del entorno de ejecución, en la sección Entorno de ejecución, compilación, conexiones y configuración de seguridad. Usa una de las siguientes opciones, según tus preferencias:

      • Selecciona Cuenta de servicio predeterminada de Compute Engine. Con los permisos de IAM predeterminados, esta cuenta puede ejecutar funciones que acceden a entornos de Cloud Composer.

      • Crea una cuenta de servicio personalizada que tenga la función de usuario de Composer y especifícala como una cuenta de servicio del entorno de ejecución para esta función. Esta opción sigue el principio de privilegio mínimo.

    • Entorno de ejecución y punto de entrada, en el paso Código. Cuando agregues código para este ejemplo, selecciona el entorno de ejecución de Python 3.7 o superior, y especifica trigger_dag_gcf como el punto de entrada.

    Agrega requisitos

    Especifica las dependencias en el archivo requirements.txt:

    google-auth==2.6.2
    requests==2.27.1
    

    Agregar el código para activar los DAG con la API de REST de Airflow

    Crea un archivo llamado composer2_airflow_rest_api.py y coloca el código para realizar llamadas a la API de REST de Airflow en este archivo.

    No cambies ninguna variable. La Cloud Function importa este archivo desde el archivo main.py.

    from typing import Any
    
    import google.auth
    from google.auth.transport.requests import AuthorizedSession
    import requests
    
    # Following GCP best practices, these credentials should be
    # constructed at start-up time and used throughout
    # https://cloud.google.com/apis/docs/client-libraries-best-practices
    AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
    CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
    
    def make_composer2_web_server_request(url: str, method: str = "GET", **kwargs: Any) -> google.auth.transport.Response:
        """
        Make a request to Cloud Composer 2 environment's web server.
        Args:
          url: The URL to fetch.
          method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
            'PATCH', 'DELETE')
          **kwargs: Any of the parameters defined for the request function:
                    https://github.com/requests/requests/blob/master/requests/api.py
                      If no timeout is provided, it is set to 90 by default.
        """
    
        authed_session = AuthorizedSession(CREDENTIALS)
    
        # Set the default timeout, if missing
        if "timeout" not in kwargs:
            kwargs["timeout"] = 90
    
        return authed_session.request(method, url, **kwargs)
    
    def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
        """
        Make a request to trigger a dag using the stable Airflow 2 REST API.
        https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
    
        Args:
          web_server_url: The URL of the Airflow 2 web server.
          dag_id: The DAG ID.
          data: Additional configuration parameters for the DAG run (json).
        """
    
        endpoint = f"api/v1/dags/{dag_id}/dagRuns"
        request_url = f"{web_server_url}/{endpoint}"
        json_data = {"conf": data}
    
        response = make_composer2_web_server_request(
            request_url, method="POST", json=json_data
        )
    
        if response.status_code == 403:
            raise requests.HTTPError(
                "You do not have a permission to perform this operation. "
                "Check Airflow RBAC roles for your account."
                f"{response.headers} / {response.text}"
            )
        elif response.status_code != 200:
            response.raise_for_status()
        else:
            return response.text

    Agrega el código de la función de Cloud Functions

    Ingresa el siguiente código en el archivo main.py. Reemplaza el valor de la variable web_server_url por la dirección del servidor web de Airflow que obtuviste antes.

    # Copyright 2021 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    Trigger a DAG in a Cloud Composer 2 environment in response to an event,
    using Cloud Functions.
    """
    
    from typing import Any
    
    import composer2_airflow_rest_api
    
    def trigger_dag_gcf(data, context=None):
        """
        Trigger a DAG and pass event data.
    
        Args:
          data: A dictionary containing the data for the event. Its format depends
          on the event.
          context: The context object for the event.
    
        For more information about the arguments, see:
        https://cloud.google.com/functions/docs/writing/background#function_parameters
        """
    
        # TODO(developer): replace with your values
        # Replace web_server_url with the Airflow web server address. To obtain this
        # URL, run the following command for your environment:
        # gcloud composer environments describe example-environment \
        #  --location=your-composer-region \
        #  --format="value(config.airflowUri)"
        web_server_url = (
            "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
        )
        # Replace with the ID of the DAG that you want to run.
        dag_id = 'composer_sample_trigger_response_dag'
    
        composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
    

    Prueba la función

    Para verificar que tu función y DAG funcionen según lo previsto, haz lo siguiente:

    1. Espera hasta que se implemente la función.
    2. Sube un archivo a tu bucket de Cloud Storage. Como alternativa, puedes activar la función de forma manual si seleccionas la acción Probar función en Google Cloud Console.
    3. Consulta la página del DAG en la interfaz web de Airflow. El DAG debe tener una DAG activa o ya completada.
    4. En la IU de Airflow, verifique los registros de tareas de esta ejecución. Deberías ver que la tarea print_gcs_info genera los datos recibidos de la función a los registros:
    [2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
    [2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
        {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
        crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
        ... }
    [2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
        return code 0h
    

    ¿Qué sigue?