Accede a la API de REST de Airflow

Cloud Composer 1 | Cloud Composer 2

Apache Airflow tiene una interfaz de la API de REST que puedes usar para realizar tareas como obtener información sobre las ejecuciones y tareas de DAG, actualizar los DAG, obtener la configuración de Airflow, agregar y borrar conexiones, y enumerar los usuarios.

Para ver un ejemplo del uso de la API de REST de Airflow con Cloud Functions, consulta Activa DAG con Cloud Functions.

Versiones de la API de REST de Airflow

Las siguientes versiones de la API de REST de Airflow están disponibles en Cloud Composer 2:

  • Airflow 2 usa la API de REST estable. La API experimental de REST dejó de estar disponible para Airflow.

  • Aún puedes usar la API de REST experimental en Airflow 2 si la habilitas a través de una anulación de configuración de Airflow, como se describe con más detalle.

Antes de comenzar

Habilita la API Cloud Composer.

Habilita la API

Habilita la API de REST estable de Airflow

La API de REST estable ya está habilitada de forma predeterminada en Airflow 2.

Cloud Composer usa su propio backend de autenticación de API.

La autorización funciona de la manera estándar que proporciona Airflow. Cuando un usuario nuevo autoriza a través de la API, la cuenta del usuario obtiene la función Op de forma predeterminada.

Puedes habilitar o inhabilitar la API de REST estable o cambiar la función del usuario predeterminada mediante la anulación de las siguientes opciones de configuración de Airflow:

Sección Clave Valor Notas
api auth_backend airflow.composer.api.backend.composer_auth Para inhabilitar la API de REST estable, cambia a airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op Puedes especificar cualquier otra función.

Habilita la API de REST de Airflow experimental

De forma predeterminada, la función de autenticación de la API está inhabilitada en la API experimental. El servidor web de Airflow rechaza todas las solicitudes que se realizan.

Para habilitar la función de autenticación de la API y la API experimental de Airflow 2, anula la siguiente opción de configuración de Airflow:

Sección Clave Valor Notas
api auth_backend airflow.api.auth.backend.default El valor predeterminado es airflow.composer.api.backend.composer_auth.
api enable_experimental_api True El valor predeterminado es False

Después de configurar la opción de configuración api-auth_backend en airflow.api.auth.backend.default, el servidor web de Airflow acepta todas las solicitudes a la API sin autenticación. Si bien el servidor web de Airflow no requiere autenticación, el servicio está protegido por Identity-Aware Proxy, que proporciona su propia capa de autenticación.

Realizar llamadas a la API de REST de Airflow

En esta sección, se proporciona un ejemplo de secuencia de comandos de Python que puedes usar para activar los DAG con la API de REST de Airflow estable.

Ingresa el contenido del siguiente ejemplo en un archivo llamado composer2_airflow_rest_api.py y, luego, proporciona la URL de la IU de Airflow, el nombre del DAG y la configuración de ejecución del DAG en los parámetros.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Trigger a DAG in Cloud Composer 2 environment using the Airflow 2 stable REST API."""

import google.auth
from google.auth.transport.requests import AuthorizedSession

import argparse
import json

AUTH_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'

def make_composer2_web_server_request(url, method='GET', **kwargs):
  """
  Make a request to Cloud Composer 2 environment's web server.
  Args:
    url: The URL to fetch.
    method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
      'PATCH', 'DELETE')
    **kwargs: Any of the parameters defined for the request function:
              https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
  """

  credentials, _ = google.auth.default(scopes=[AUTH_SCOPE])
  authed_session = AuthorizedSession(credentials)

  # Set the default timeout, if missing
  if 'timeout' not in kwargs:
    kwargs['timeout'] = 90

  return authed_session.request(
    method,
    url,
    **kwargs)

def trigger_dag(webserver_url, dag_id, data, context=None):

  endpoint = f'api/v1/dags/{dag_id}/dagRuns'
  request_url = f'{webserver_url}/{endpoint}'
  json_data = { 'conf': data }

  response = make_composer2_web_server_request(request_url,
    method='POST',
    json=json_data
    )

  if response.status_code == 403:
    raise Exception('You do not have a permission to access this resource.')
  elif response.status_code != 200:
    raise Exception(
      'Bad request: {!r} / {!r} / {!r}'.format(
        response.status_code, response.headers, response.text))
  else:
    return response.text

# Usage:
# python3 composer2_airflow_rest_api.py dag_id dag_config webserver_url

# Example:
# python3 composer2_airflow_rest_api.py composer_sample_dag \
# '{"test": "value"}' \
# https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com

if __name__ == "__main__":

  parser = argparse.ArgumentParser(
      description=__doc__,
      formatter_class=argparse.RawDescriptionHelpFormatter)

  parser.add_argument('dag_id', help='ID of the DAG to run.')
  parser.add_argument(
      'dag_config',
      help='Config for the DAG run. Example: {"test": "value"}',
      type=str)
  parser.add_argument(
      'webserver_url',
      help='Airflow web server address. To obtain this URL, \
            run the following command for your environment: \
            gcloud composer environments describe example-environment \
            --location=us-central1 \
            --format="value(config.airflowUri)"')

  args = parser.parse_args()
  dag_config_json = json.loads(args.dag_config)

  response_text = trigger_dag(
    webserver_url=args.webserver_url, dag_id=args.dag_id, data=dag_config_json)

  print(response_text)

¿Qué sigue?