Accede a la API de REST de Airflow

Cloud Composer 1 | Cloud Composer 2

Apache Airflow tiene una interfaz de la API de REST que puedes usar para realizar tareas como obtener información sobre ejecuciones y tareas de DAG, actualizar DAG, obtener la configuración de Airflow, agregar y borrar conexiones, y enumerar usuarios.

Para ver un ejemplo del uso de la API de REST de Airflow con Cloud Functions, consulta Activa los DAG con Cloud Functions.

Versiones de la API de REST de Airflow

Las siguientes versiones de la API de REST de Airflow están disponibles en Cloud Composer 1:

  • Airflow 2 usa la API de REST estable. Airflow dejó de estar disponible en la API de REST experimental.

  • Aún puedes usar la API de REST experimental en Airflow 2 si la habilitas a través de una anulación de configuración de Airflow, como se describe más adelante.

Antes de comenzar

Habilita la API de Cloud Composer.

Habilita la API

Habilita la API de REST estable de Airflow

Airflow 2

La API de REST estable ya está habilitada de forma predeterminada en Airflow 2.

Cloud Composer usa su propio backend de autenticación de API, que está integrado en Identity-Aware Proxy.

La autorización funciona de la manera estándar que proporciona Airflow. Cuando un usuario nuevo se autoriza a través de la API, la cuenta del usuario obtiene la función Op de forma predeterminada.

Puedes habilitar o inhabilitar la API de REST estable o cambiar la función de usuario predeterminada si anulas las siguientes opciones de configuración de Airflow:

Sección Clave Valor Notas
api (Airflow 2.2.5 y versiones anteriores) auth_backend
(Airflow 2.3.0 y versiones posteriores) auth_backends
airflow.composer.api.backend.composer_auth Para inhabilitar la API de REST estable, cambia a airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op Puedes especificar cualquier otra función.

Airflow 1

La API de REST estable no está disponible en Airflow 1. En su lugar, puedes usar la API de REST experimental.

Habilita la API de REST de Airflow experimental

Airflow 2

De forma predeterminada, la función de autenticación de la API está inhabilitada en la API experimental. El servidor web de Airflow rechaza todas las solicitudes que realizas.

Para habilitar la función de autenticación de la API y la API experimental de Airflow 2, anula la siguiente opción de configuración de Airflow:

Sección Clave Valor Notas
api (Airflow 2.2.5 y versiones anteriores) auth_backend
(Airflow 2.3.0 y versiones posteriores) auth_backends
airflow.api.auth.backend.default El valor predeterminado es airflow.composer.api.backend.composer_auth.
api enable_experimental_api True El valor predeterminado es False.

Airflow 1

De forma predeterminada, la función de autenticación de la API está inhabilitada en Airflow 1.10.11 y versiones posteriores. El servidor web de Airflow rechaza todas las solicitudes que realizas. Debes usar solicitudes para activar los DAG, por lo que debes habilitar esta función.

Para habilitar la función de autenticación de API en Airflow 1, anula la siguiente opción de configuración de Airflow:

Sección Clave Valor Notas
api auth_backend airflow.api.auth.backend.default El valor predeterminado es airflow.api.auth.backend.deny_all.

Después de establecer esta opción de configuración en airflow.api.auth.backend.default, el servidor web de Airflow aceptará todas las solicitudes a la API sin autenticación. Aunque el servidor web de Airflow no requiere autenticación, aún está protegido por Identity-Aware Proxy, que proporciona su propia capa de autenticación.

Permitir llamadas a la API de REST de Airflow con el control de acceso de Webserver

Según el método que se use para llamar a la API de REST de Airflow, el método emisor puede usar una dirección IPv4 o IPv6. Recuerda desbloquear el tráfico IP a la API de REST de Airflow con el control de acceso de Webserver.

Usa la opción de configuración predeterminada, que es All IP addresses have access (default), si no estás seguro de desde qué direcciones IP se enviarán tus llamadas a la API de REST de Airflow.

Realiza llamadas a la API de REST de Airflow

Obtén el ID de cliente del proxy de IAM

Para realizar una solicitud al extremo de la API de REST de Airflow, la función requiere el ID de cliente del proxy de IAM que protege el servidor web de Airflow.

Cloud Composer no proporciona esta información directamente. En cambio, realiza una solicitud no autenticada al servidor web de Airflow y captura el ID de cliente de la URL de redireccionamiento.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Reemplaza AIRFLOW_URL por la URL de la interfaz web de Airflow.

En el resultado, busca la string que sigue a client_id. Por ejemplo:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Guarda el siguiente código en un archivo llamado get_client_id.py. Completa tus valores para project_id, location y composer_environment. Luego, ejecuta el código en Cloud Shell o en tu entorno local.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Llamar a la API de REST de Airflow con client_id

Realiza los siguientes reemplazos:

  • Reemplaza el valor de la variable client_id por el valor client_id que obtuviste en el paso anterior.
  • Reemplaza el valor de la variable webserver_id por el ID de proyecto de tu usuario, que es parte de la URL de la interfaz web de Airflow antes de .appspot.com. Obtuviste la URL de la interfaz web de Airflow en un paso anterior.
  • Especifica la versión de la API de REST de Airflow que usas:

    • Si usas la API de REST de Airflow estable, establece la variable USE_EXPERIMENTAL_API en False.
    • Si usas la API de REST de Airflow experimental, no es necesario realizar cambios. La variable USE_EXPERIMENTAL_API ya está configurada como True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text

# END COPIED IAP CODE

Acceder a la API de REST de Airflow con una cuenta de servicio

La base de datos de Airflow limita la longitud del campo de correo electrónico a 64 caracteres. A veces, las cuentas de servicio tienen direcciones de correo electrónico de más de 64 caracteres. No es posible crear usuarios de Airflow para esas cuentas de servicio de la manera habitual. Si no hay un usuario de Airflow para esa cuenta de servicio, el acceso a la API de REST de Airflow generará errores HTTP 401 y 403.

Como solución alternativa, puedes realizar el registro previo de un usuario de Airflow para una cuenta de servicio. Para hacerlo, usa accounts.google.com:NUMERIC_USER_ID como nombre de usuario y cualquier cadena única como correo electrónico.

  1. Si quieres obtener NUMERIC_USER_ID para una cuenta de servicio, ejecuta lo siguiente:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Reemplaza lo siguiente:

    • SA_NAME por el nombre de la cuenta de servicio.
    • PROJECT_ID por el ID del proyecto.
  2. Crea un usuario de Airflow con el rol Op para la cuenta de servicio:

    IU de Airflow

    1. Ve a la IU de Airflow.

    2. Ve a Administrador > Usuarios y haz clic en Crear. Tu usuario de Airflow debe tener el rol Admin para abrir esta página.

    3. Especifica accounts.google.com:NUMERIC_USER_ID como el nombre de usuario. Reemplaza NUMERIC_USER_ID por el ID de usuario que obtuviste en el paso anterior.

    4. Especifica un identificador único como el correo electrónico. Puedes usar cualquier cadena única.

    5. Especifica el rol del usuario. Por ejemplo, Op.

    6. Asegúrate de que la casilla de verificación Is Active? esté seleccionada.

    7. Especifica el nombre y el apellido del usuario. Puedes usar cualquier cadena.

    8. Haz clic en Guardar.

    gcloud

    En Airflow 2, ejecuta el siguiente comando de la CLI de Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Reemplaza lo siguiente:

    • ENVIRONMENT_NAME por el nombre del entorno.
    • LOCATION por la región en la que se encuentra el entorno
    • NUMERIC_USER_ID por el ID de usuario que obtuviste en el paso anterior
    • UNIQUE_ID por el identificador para el usuario de Airflow Puedes usar cualquier cadena única.
  3. Después de crear un usuario de Airflow para una cuenta de servicio, un emisor autenticado como cuenta de servicio se reconoce como un usuario con registro previo y accede a Airflow.

Escalamiento del componente de la API de REST de Airflow

La API de REST de Airflow y los extremos de la IU de Airflow se ejecutan dentro del componente, es decir, el servidor web de Airflow. En caso de que uses la API de REST de manera intensiva, considera aumentar los parámetros de CPU y memoria para ajustar los recursos de Airflow Webserver a la carga esperada.

¿Qué sigue?