Accede a la API de REST de Airflow

Cloud Composer 1 | Cloud Composer 2

Apache Airflow tiene una interfaz de la API de REST que puedes usar para realizar tareas como obtener información sobre ejecuciones y tareas de DAG, actualizar DAG, obtener la configuración de Airflow, agregar y borrar conexiones, y enumerar usuarios.

Para ver un ejemplo del uso de la API de REST de Airflow con Cloud Functions, consulta Activa los DAG con Cloud Functions.

Versiones de la API de REST de Airflow

Las siguientes versiones de la API de REST de Airflow están disponibles en Cloud Composer 2:

  • Airflow 2 usa la API de REST estable. Airflow dejó de estar disponible en la API de REST experimental.

  • Aún puedes usar la API de REST experimental en Airflow 2 si la habilitas a través de una anulación de configuración de Airflow, como se describe más adelante.

Antes de comenzar

Habilita la API de Cloud Composer.

Habilita la API

Habilita la API de REST estable de Airflow

La API de REST estable ya está habilitada de forma predeterminada en Airflow 2.

Cloud Composer usa su propio backend de autenticación de API.

La autorización funciona de la manera estándar que proporciona Airflow. Cuando un usuario nuevo se autoriza a través de la API, la cuenta del usuario obtiene la función Op de forma predeterminada.

Puedes habilitar o inhabilitar la API de REST estable o cambiar la función de usuario predeterminada si anulas las siguientes opciones de configuración de Airflow:

Sección Clave Valor Notas
api (Airflow 2.2.5 y versiones anteriores) auth_backend
(Airflow 2.3.0 y versiones posteriores) auth_backends
airflow.composer.api.backend.composer_auth Para inhabilitar la API de REST estable, cambia a airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op Puedes especificar cualquier otra función.

Habilita la API de REST de Airflow experimental

De forma predeterminada, la función de autenticación de la API está inhabilitada en la API experimental. El servidor web de Airflow rechaza todas las solicitudes que realizas.

Para habilitar la función de autenticación de la API y la API experimental de Airflow 2, anula la siguiente opción de configuración de Airflow:

Sección Clave Valor Notas
api (Airflow 2.2.5 y versiones anteriores) auth_backend
(Airflow 2.3.0 y versiones posteriores) auth_backends
airflow.api.auth.backend.default El valor predeterminado es airflow.composer.api.backend.composer_auth.
api enable_experimental_api True El valor predeterminado es False.

Después de establecer esta opción de configuración en airflow.api.auth.backend.default, el servidor web de Airflow aceptará todas las solicitudes a la API sin autenticación. Aunque el servidor web de Airflow no requiere autenticación, aún está protegido por Identity-Aware Proxy, que proporciona su propia capa de autenticación.

Permitir llamadas a la API de REST de Airflow con el control de acceso de Webserver

Según el método que se use para llamar a la API de REST de Airflow, el método emisor puede usar una dirección IPv4 o IPv6. Recuerda desbloquear el tráfico IP a la API de REST de Airflow con el control de acceso de Webserver.

Usa la opción de configuración predeterminada, que es All IP addresses have access (default), si no estás seguro de desde qué direcciones IP se enviarán tus llamadas a la API de REST de Airflow.

Realiza llamadas a la API de REST de Airflow

En esta sección, se proporciona una secuencia de comandos de Python de ejemplo que puedes usar para activar los DAG con la API de REST de Airflow estable.

Coloca el contenido del siguiente ejemplo en un archivo llamado composer2_airflow_rest_api.py y, luego, proporciona la URL de la IU de Airflow, el nombre del DAG y la configuración de la ejecución del DAG en los valores de variables.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests

# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)

def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Por ejemplo, la siguiente configuración no es correcta

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

Acceder a la API de REST de Airflow con una cuenta de servicio

La base de datos de Airflow limita la longitud del campo de correo electrónico a 64 caracteres. A veces, las cuentas de servicio tienen direcciones de correo electrónico de más de 64 caracteres. No es posible crear usuarios de Airflow para esas cuentas de servicio de la manera habitual. Si no hay un usuario de Airflow para esa cuenta de servicio, el acceso a la API de REST de Airflow generará errores HTTP 401 y 403.

Como solución alternativa, puedes realizar el registro previo de un usuario de Airflow para una cuenta de servicio. Para hacerlo, usa accounts.google.com:NUMERIC_USER_ID como nombre de usuario y cualquier cadena única como correo electrónico.

  1. Si quieres obtener NUMERIC_USER_ID para una cuenta de servicio, ejecuta lo siguiente:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Reemplaza lo siguiente:

    • SA_NAME por el nombre de la cuenta de servicio.
    • PROJECT_ID por el ID del proyecto.
  2. Crea un usuario de Airflow con el rol Op para la cuenta de servicio:

    IU de Airflow

    1. Ve a la IU de Airflow.

    2. Ve a Seguridad > Lista de usuarios y haz clic en Agregar un registro nuevo. Tu usuario de Airflow debe tener la función Admin para abrir esta página.

    3. Especifica accounts.google.com:NUMERIC_USER_ID como el nombre de usuario. Reemplaza NUMERIC_USER_ID por el ID de usuario que obtuviste en el paso anterior.

    4. Especifica un identificador único como el correo electrónico. Puedes usar cualquier cadena única.

    5. Especifica el rol del usuario. Por ejemplo, Op.

    6. Asegúrate de que la casilla de verificación Is Active? esté seleccionada.

    7. Especifica el nombre y el apellido del usuario. Puedes usar cualquier cadena.

    8. Haz clic en Guardar.

    gcloud

    Ejecuta el siguiente comando de la CLI de Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME por el nombre del entorno.
    • LOCATION por la región en la que se encuentra el entorno
    • NUMERIC_USER_ID por el ID de usuario que obtuviste en el paso anterior
    • UNIQUE_ID por el identificador para el usuario de Airflow Puedes usar cualquier cadena única.
  3. Después de crear un usuario de Airflow para una cuenta de servicio, un emisor autenticado como cuenta de servicio se reconoce como un usuario con registro previo y accede a Airflow.

Escalamiento del componente de la API de REST de Airflow

La API de REST de Airflow y los extremos de la IU de Airflow se ejecutan dentro del componente, es decir, el servidor web de Airflow. En caso de que uses la API de REST de manera intensiva, considera aumentar los parámetros de CPU y memoria para ajustar los recursos de Airflow Webserver a la carga esperada.

¿Qué sigue?