Acessar a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas, como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com o Cloud Functions, consulte Como acionar DAGs com o Cloud Functions.

Versões da API REST do Airflow

As seguintes versões da API REST do Airflow estão disponíveis no Cloud Composer 1:

  • O Airflow 2 usa a API REST estável. A API REST experimental está obsoleta pelo Airflow.

  • Ainda será possível usar a API REST experimental no Airflow 2 se você ativá-la por meio de uma modificação de configuração, como descrito em mais detalhes.

Antes de começar

Ative a API Cloud Composer.

Ative a API

Ativar a API REST estável do Airflow

Airflow 2

A API REST estável já está ativada por padrão no Airflow 2.

O Cloud Composer usa o próprio back-end de autenticação da API, integrado ao Identity-Aware Proxy.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão modificando as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api auth_backend airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Airflow 1

A API REST estável não está disponível no Airflow 1. Em vez disso, use a API REST experimental.

Ativar a API REST experimental do Airflow

Airflow 2

Por padrão, o recurso de autenticação da API está desativado na API experimental. O servidor da Web do Airflow nega todas as solicitações feitas.

Para ativar o recurso de autenticação da API e a API experimental Airflow 2, modifique a seguinte opção de configuração:

Seção Chave Valor Observações
api auth_backend airflow.api.auth.backend.default O padrão é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True O padrão é False

Airflow 1

Por padrão, o recurso de autenticação da API está desativado no Airflow 1.10.11 e versões posteriores. O servidor da Web do Airflow nega todas as solicitações feitas. Como você usa as solicitações para acionar DAGs, ative esse recurso.

Para ativar o recurso de autenticação da API no Airflow 1, modifique a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api auth_backend airflow.api.auth.backend.default O padrão é airflow.api.auth.backend.deny_all

Depois de definir a opção de configuração api-auth_backend como airflow.api.auth.backend.default, o servidor da Web do Airflow aceita todas as solicitações de API sem autenticação. Mesmo que o próprio servidor da Web do Airflow não exija autenticação, ele ainda está protegido pelo Identity-Aware Proxy, que fornece a própria camada de autenticação.

Fazer chamadas para a API REST do Airflow

Faça as seguintes substituições:

  • Substitua o valor da variável client_id pelo valor client_id recebido em uma etapa anterior.
  • Substitua o valor da variável webserver_id pelo ID do projeto de locatário, que faz parte do URL da interface da Web do Airflow antes de .appspot.com. Você recebeu o URL da interface da Web do Airflow em uma etapa anterior.
  • Especifique a versão da API REST do Airflow usada:

    • Se você usa a API REST estável do Airflow, defina a variável USE_EXPERIMENTAL_API como False.
    • Se você usa a API REST experimental do Airflow, não é necessário fazer alterações. A variável USE_EXPERIMENTAL_API já está definida como True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

A seguir